diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index ad4834ce03..be9e5a1ef1 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -162,6 +162,8 @@ const EnvironmentSchema = z.object({ SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10), SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100), SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100), + SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS: z.coerce.number().int().default(1000), + SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE: z.coerce.number().int().default(25), // Development OTEL environment variables DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(), @@ -219,6 +221,10 @@ const EnvironmentSchema = z.object({ .number() .int() .default(60 * 1000 * 15), + MARQS_SHARED_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36), + MARQS_DEV_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(12), + MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64), + PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), VERBOSE_GRAPHILE_LOGGING: z.string().default("false"), diff --git a/apps/webapp/app/models/taskQueue.server.ts b/apps/webapp/app/models/taskQueue.server.ts new file mode 100644 index 0000000000..a9c52caf9f --- /dev/null +++ b/apps/webapp/app/models/taskQueue.server.ts @@ -0,0 +1,63 @@ +import { QueueOptions } from "@trigger.dev/core/v3/schemas"; +import { TaskQueue } from "@trigger.dev/database"; +import { prisma } from "~/db.server"; + +export async function findQueueInEnvironment( + queueName: string, + environmentId: string, + backgroundWorkerTaskId?: string, + backgroundTask?: { queueConfig?: unknown } +): Promise { + const sanitizedQueueName = sanitizeQueueName(queueName); + + const queue = await prisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environmentId, + name: sanitizedQueueName, + }, + }); + + if (queue) { + return queue; + } + + const task = backgroundTask + ? backgroundTask + : backgroundWorkerTaskId + ? await prisma.backgroundWorkerTask.findFirst({ + where: { + id: backgroundWorkerTaskId, + }, + }) + : undefined; + + if (!task) { + return; + } + + const queueConfig = QueueOptions.safeParse(task.queueConfig); + + if (queueConfig.success) { + const taskQueueName = queueConfig.data.name + ? sanitizeQueueName(queueConfig.data.name) + : undefined; + + if (taskQueueName && taskQueueName !== sanitizedQueueName) { + const queue = await prisma.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environmentId, + name: taskQueueName, + }, + }); + + if (queue) { + return queue; + } + } + } +} + +// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters) +export function sanitizeQueueName(queueName: string) { + return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128); +} diff --git a/apps/webapp/app/routes/admin.api.v1.marqs.ts b/apps/webapp/app/routes/admin.api.v1.marqs.ts deleted file mode 100644 index 14a9fd409e..0000000000 --- a/apps/webapp/app/routes/admin.api.v1.marqs.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { LoaderFunctionArgs, json } from "@remix-run/server-runtime"; -import { prisma } from "~/db.server"; -import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; -import { marqs } from "~/v3/marqs/index.server"; - -export async function loader({ request, params }: LoaderFunctionArgs) { - // Next authenticate the request - const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); - - if (!authenticationResult) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } - - const user = await prisma.user.findUnique({ - where: { - id: authenticationResult.userId, - }, - }); - - if (!user) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } - - if (!user.admin) { - return json({ error: "You must be an admin to perform this action" }, { status: 403 }); - } - - const details = await marqs?.getSharedQueueDetails(); - - return json(details); -} diff --git a/apps/webapp/app/services/apiAuth.server.ts b/apps/webapp/app/services/apiAuth.server.ts index dd973b257d..03c5f128ee 100644 --- a/apps/webapp/app/services/apiAuth.server.ts +++ b/apps/webapp/app/services/apiAuth.server.ts @@ -17,6 +17,7 @@ import { isPersonalAccessToken, } from "./personalAccessToken.server"; import { isPublicJWT, validatePublicJwtKey } from "./realtime/jwtAuth.server"; +import { RuntimeEnvironmentForEnvRepo } from "~/v3/environmentVariables/environmentVariablesRepository.server"; const ClaimsSchema = z.object({ scopes: z.array(z.string()).optional(), @@ -410,7 +411,7 @@ const JWT_ALGORITHM = "HS256"; const DEFAULT_JWT_EXPIRATION_IN_MS = 1000 * 60 * 60; // 1 hour export async function generateJWTTokenForEnvironment( - environment: RuntimeEnvironment, + environment: RuntimeEnvironmentForEnvRepo, payload: Record ) { const jwt = await new SignJWT({ diff --git a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts index 975ce231f1..b10a84d0e4 100644 --- a/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts +++ b/apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts @@ -654,9 +654,26 @@ export class EnvironmentVariablesRepository implements Repository { } } +export const RuntimeEnvironmentForEnvRepoPayload = { + select: { + id: true, + slug: true, + type: true, + projectId: true, + apiKey: true, + organizationId: true, + }, +} as const; + +export type RuntimeEnvironmentForEnvRepo = Prisma.RuntimeEnvironmentGetPayload< + typeof RuntimeEnvironmentForEnvRepoPayload +>; + export const environmentVariablesRepository = new EnvironmentVariablesRepository(); -export async function resolveVariablesForEnvironment(runtimeEnvironment: RuntimeEnvironment) { +export async function resolveVariablesForEnvironment( + runtimeEnvironment: RuntimeEnvironmentForEnvRepo +) { const projectSecrets = await environmentVariablesRepository.getEnvironmentVariables( runtimeEnvironment.projectId, runtimeEnvironment.id @@ -672,7 +689,9 @@ export async function resolveVariablesForEnvironment(runtimeEnvironment: Runtime return [...overridableTriggerVariables, ...projectSecrets, ...builtInVariables]; } -async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnvironment) { +async function resolveOverridableTriggerVariables( + runtimeEnvironment: RuntimeEnvironmentForEnvRepo +) { let result: Array = [ { key: "TRIGGER_REALTIME_STREAM_VERSION", @@ -683,7 +702,7 @@ async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnv return result; } -async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment) { +async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) { let result: Array = [ { key: "OTEL_EXPORTER_OTLP_ENDPOINT", @@ -745,7 +764,7 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment return [...result, ...commonVariables]; } -async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironment) { +async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) { let result: Array = [ { key: "TRIGGER_SECRET_KEY", @@ -838,7 +857,7 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen } async function resolveCommonBuiltInVariables( - runtimeEnvironment: RuntimeEnvironment + runtimeEnvironment: RuntimeEnvironmentForEnvRepo ): Promise> { return []; } diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index 8560312d70..a7506f3913 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -13,18 +13,15 @@ import { prisma } from "~/db.server"; import { createNewSession, disconnectSession } from "~/models/runtimeEnvironment.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; -import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server"; import { FailedTaskRunService } from "../failedTaskRun.server"; import { CancelDevSessionRunsService } from "../services/cancelDevSessionRuns.server"; import { CompleteAttemptService } from "../services/completeAttempt.server"; -import { - SEMINTATTRS_FORCE_RECORDING, - attributesFromAuthenticatedEnv, - tracer, -} from "../tracer.server"; -import { DevSubscriber, devPubSub } from "./devPubSub.server"; +import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server"; import { getMaxDuration } from "../utils/maxDuration"; +import { DevSubscriber, devPubSub } from "./devPubSub.server"; +import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; const MessageBody = z.discriminatedUnion("type", [ z.object({ @@ -436,14 +433,12 @@ export class DevQueueConsumer { return; } - const queue = await prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: this.env.id, - name: sanitizeQueueName(lockedTaskRun.queue), - }, - }, - }); + const queue = await findQueueInEnvironment( + lockedTaskRun.queue, + this.env.id, + backgroundTask.id, + backgroundTask + ); if (!queue) { logger.debug("[DevQueueConsumer] Failed to find queue", { diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 31b1bc2a9c..b7b249bbbb 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -1,11 +1,12 @@ import { + context, + propagation, Span, SpanKind, SpanOptions, - Tracer, - context, - propagation, + SpanStatusCode, trace, + Tracer, } from "@opentelemetry/api"; import { SEMATTRS_MESSAGE_ID, @@ -18,6 +19,7 @@ import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; +import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server"; import { attributesFromAuthenticatedEnv } from "../tracer.server"; import { AsyncWorker } from "./asyncWorker.server"; import { MarQSShortKeyProducer } from "./marqsKeyProducer.server"; @@ -29,10 +31,12 @@ import { MessageQueueSubscriber, QueueCapacities, QueueRange, + QueueWithScores, VisibilityTimeoutStrategy, } from "./types"; import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server"; -import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server"; +import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache"; +import { MemoryStore } from "@unkey/cache/stores"; const KEY_PREFIX = "marqs:"; @@ -41,10 +45,11 @@ const constants = { } as const; const SemanticAttributes = { - QUEUE: "marqs.queue", - PARENT_QUEUE: "marqs.parentQueue", - MESSAGE_ID: "marqs.messageId", - CONCURRENCY_KEY: "marqs.concurrencyKey", + CONSUMER_ID: "consumer_id", + QUEUE: "queue", + PARENT_QUEUE: "parent_queue", + MESSAGE_ID: "message_id", + CONCURRENCY_KEY: "concurrency_key", }; export type MarQSOptions = { @@ -60,6 +65,7 @@ export type MarQSOptions = { queuePriorityStrategy: MarQSQueuePriorityStrategy; envQueuePriorityStrategy: MarQSQueuePriorityStrategy; visibilityTimeoutStrategy: VisibilityTimeoutStrategy; + maximumNackCount: number; enableRebalancing?: boolean; verbose?: boolean; subscriber?: MessageQueueSubscriber; @@ -71,17 +77,50 @@ export type MarQSOptions = { export class MarQS { private redis: Redis; public keys: MarQSKeyProducer; - private queuePriorityStrategy: MarQSQueuePriorityStrategy; #rebalanceWorkers: Array = []; + private _cache: UnkeyCache<{ + childQueueSize: number; + queueConcurrencyLimit: number; + envAndOrgConcurrencyLimit: number; + disabledConcurrency: boolean; + }>; + + private _consumerQueues: Map> = new Map(); + constructor(private readonly options: MarQSOptions) { this.redis = new Redis(options.redis); this.keys = options.keysProducer; - this.queuePriorityStrategy = options.queuePriorityStrategy; this.#startRebalanceWorkers(); this.#registerCommands(); + + const ctx = new DefaultStatefulContext(); + const memory = new MemoryStore({ persistentMap: new Map() }); + + this._cache = createCache({ + childQueueSize: new Namespace(ctx, { + stores: [memory], + fresh: 5000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. + stale: 10_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. + }), + queueConcurrencyLimit: new Namespace(ctx, { + stores: [memory], + fresh: 5000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. + stale: 10_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. + }), + envAndOrgConcurrencyLimit: new Namespace(ctx, { + stores: [memory], + fresh: 60_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. + stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. + }), + disabledConcurrency: new Namespace(ctx, { + stores: [memory], + fresh: 30_000, // The time in milliseconds that a value is considered fresh. Cache hits within this time will return the cached value. + stale: 180_000, // The time in milliseconds that a value is considered stale. Cache hits within this time will return the cached value and trigger a background refresh. + }), + }); } get name() { @@ -235,72 +274,86 @@ export class MarQS { async (span) => { const parentQueue = this.keys.envSharedQueueKey(env); - // Read the parent queue for matching queues - const messageQueue = await this.#getRandomQueueFromParentQueue( + span.setAttribute(SemanticAttributes.PARENT_QUEUE, parentQueue); + span.setAttribute(SemanticAttributes.CONSUMER_ID, env.id); + + const childQueues = await this.#allChildQueuesForConsumer(parentQueue, env.id); + + span.setAttribute("parent_queue_length", childQueues.length); + + if (childQueues.length === 0) { + return; + } + + // Get prioritized list of queues to try + const queues = await this.#getPrioritizedQueueCandidates( parentQueue, + childQueues, this.options.envQueuePriorityStrategy, (queue) => this.#calculateMessageQueueCapacities(queue, { checkForDisabled: false }), env.id ); - if (!messageQueue) { - return; - } + span.setAttribute("queue_count", queues.length); - const messageData = await this.#callDequeueMessage({ - messageQueue, - parentQueue, - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, - concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(messageQueue), - currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(messageQueue), - envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(messageQueue), - envCurrentConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue), - orgConcurrencyLimitKey: this.keys.orgConcurrencyLimitKeyFromQueue(messageQueue), - orgCurrentConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), - }); + for (const messageQueue of queues) { + const messageData = await this.#callDequeueMessage({ + messageQueue, + parentQueue, + concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(messageQueue), + currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(messageQueue), + envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(messageQueue), + envCurrentConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue), + orgConcurrencyLimitKey: this.keys.orgConcurrencyLimitKeyFromQueue(messageQueue), + orgCurrentConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), + }); - if (!messageData) { - return; - } + if (!messageData) { + return; + } - const message = await this.readMessage(messageData.messageId); + const message = await this.readMessage(messageData.messageId); - if (message) { - span.setAttributes({ - [SEMATTRS_MESSAGE_ID]: message.messageId, - [SemanticAttributes.QUEUE]: message.queue, - [SemanticAttributes.MESSAGE_ID]: message.messageId, - [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, - [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - }); + if (message) { + span.setAttributes({ + [SEMATTRS_MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success + }); - await this.options.subscriber?.messageDequeued(message); - } else { - logger.error(`Failed to read message, undoing the dequeueing of the message`, { - messageData, - service: this.name, - }); + await this.options.subscriber?.messageDequeued(message); + } else { + logger.error(`Failed to read message, undoing the dequeueing of the message`, { + messageData, + service: this.name, + }); - await this.#callAcknowledgeMessage({ - parentQueue, - messageKey: this.keys.messageKey(messageData.messageId), - messageQueue: messageQueue, - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, - concurrencyKey: this.keys.currentConcurrencyKeyFromQueue(messageQueue), - envConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue), - orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), - messageId: messageData.messageId, - }); + await this.#callAcknowledgeMessage({ + parentQueue, + messageKey: this.keys.messageKey(messageData.messageId), + messageQueue: messageQueue, + concurrencyKey: this.keys.currentConcurrencyKeyFromQueue(messageQueue), + envConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue), + orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), + messageId: messageData.messageId, + }); - return; - } + return; + } - await this.options.visibilityTimeoutStrategy.heartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); + await this.options.visibilityTimeoutStrategy.heartbeat( + messageData.messageId, + this.visibilityTimeoutInMs + ); + + return message; + } - return message; + span.setAttribute("attempted_queues", queues.length); + return; }, { kind: SpanKind.CONSUMER, @@ -313,37 +366,6 @@ export class MarQS { ); } - public async getSharedQueueDetails() { - const parentQueue = this.keys.sharedQueueKey(); - - const { range } = await this.queuePriorityStrategy.nextCandidateSelection( - parentQueue, - "getSharedQueueDetails" - ); - const queues = await this.#getChildQueuesWithScores(parentQueue, range); - - const queuesWithScores = await this.#calculateQueueScores(queues, (queue) => - this.#calculateMessageQueueCapacities(queue) - ); - - // We need to priority shuffle here to ensure all workers aren't just working on the highest priority queue - const choice = this.queuePriorityStrategy.chooseQueue( - queuesWithScores, - parentQueue, - "getSharedQueueDetails", - range - ); - - return { - selectionId: "getSharedQueueDetails", - queues, - queuesWithScores, - nextRange: range, - queueCount: queues.length, - queueChoice: choice, - }; - } - /** * Dequeue a message from the shared queue (this should be used in production environments) */ @@ -351,57 +373,87 @@ export class MarQS { return this.#trace( "dequeueMessageInSharedQueue", async (span) => { + span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); + const parentQueue = this.keys.sharedQueueKey(); - // Read the parent queue for matching queues - const messageQueue = await this.#getRandomQueueFromParentQueue( + span.setAttribute(SemanticAttributes.PARENT_QUEUE, parentQueue); + + const childQueues = await this.#allChildQueuesForConsumer(parentQueue, consumerId); + + span.setAttribute("parent_queue_length", childQueues.length); + + if (childQueues.length === 0) { + return; + } + + // Get prioritized list of queues to try + const queues = await this.#getPrioritizedQueueCandidates( parentQueue, + childQueues, this.options.queuePriorityStrategy, (queue) => this.#calculateMessageQueueCapacities(queue, { checkForDisabled: true }), consumerId ); - if (!messageQueue) { - return; - } + span.setAttribute("queue_count", queues.length); - // If the queue includes a concurrency key, we need to remove the ck:concurrencyKey from the queue name - const messageData = await this.#callDequeueMessage({ - messageQueue, - parentQueue, - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, - concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(messageQueue), - currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(messageQueue), - envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(messageQueue), - envCurrentConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue), - orgConcurrencyLimitKey: this.keys.orgConcurrencyLimitKeyFromQueue(messageQueue), - orgCurrentConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), - }); - - if (!messageData) { + if (queues.length === 0) { return; } - const message = await this.readMessage(messageData.messageId); - - if (message) { - span.setAttributes({ - [SEMATTRS_MESSAGE_ID]: message.messageId, - [SemanticAttributes.QUEUE]: message.queue, - [SemanticAttributes.MESSAGE_ID]: message.messageId, - [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, - [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, - }); + // Try each queue in order until we successfully dequeue a message + for (const messageQueue of queues) { + try { + const messageData = await this.#callDequeueMessage({ + messageQueue, + parentQueue, + concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(messageQueue), + currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(messageQueue), + envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(messageQueue), + envCurrentConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(messageQueue), + orgConcurrencyLimitKey: this.keys.orgConcurrencyLimitKeyFromQueue(messageQueue), + orgCurrentConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue), + }); - await this.options.subscriber?.messageDequeued(message); + if (!messageData) { + continue; // Try next queue if no message was dequeued + } + + const message = await this.readMessage(messageData.messageId); + + if (message) { + const ageOfMessageInMs = Date.now() - message.timestamp; + + span.setAttributes({ + [SEMATTRS_MESSAGE_ID]: message.messageId, + [SemanticAttributes.QUEUE]: message.queue, + [SemanticAttributes.MESSAGE_ID]: message.messageId, + [SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey, + [SemanticAttributes.PARENT_QUEUE]: message.parentQueue, + age_in_seconds: ageOfMessageInMs / 1000, + attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success + }); + + await this.options.subscriber?.messageDequeued(message); + + await this.options.visibilityTimeoutStrategy.heartbeat( + messageData.messageId, + this.visibilityTimeoutInMs + ); + + return message; + } + } catch (error) { + // Log error but continue trying other queues + logger.warn(`[${this.name}] Failed to dequeue from queue ${messageQueue}`, { error }); + continue; + } } - await this.options.visibilityTimeoutStrategy.heartbeat( - messageData.messageId, - this.visibilityTimeoutInMs - ); - - return message; + // If we get here, we tried all queues but couldn't dequeue a message + span.setAttribute("attempted_queues", queues.length); + return; }, { kind: SpanKind.CONSUMER, @@ -442,7 +494,6 @@ export class MarQS { parentQueue: message.parentQueue, messageKey: this.keys.messageKey(messageId), messageQueue: message.queue, - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, concurrencyKey: this.keys.currentConcurrencyKeyFromQueue(message.queue), envConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(message.queue), orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(message.queue), @@ -511,7 +562,6 @@ export class MarQS { parentQueue: oldMessage.parentQueue, messageKey: this.keys.messageKey(messageId), messageQueue: oldMessage.queue, - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, concurrencyKey: this.keys.currentConcurrencyKeyFromQueue(oldMessage.queue), envConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(oldMessage.queue), orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(oldMessage.queue), @@ -636,6 +686,11 @@ export class MarQS { span.recordException(new Error(String(e))); } + span.setStatus({ + code: SpanStatusCode.ERROR, + message: e instanceof Error ? e.message : "Unknown error", + }); + throw e; } finally { span.end(); @@ -658,7 +713,7 @@ export class MarQS { const message = await this.readMessage(messageId); if (!message) { - logger.log(`[${this.name}].nackMessage() message not found`, { + logger.debug(`[${this.name}].nackMessage() message not found`, { messageId, retryAt, updates, @@ -667,6 +722,25 @@ export class MarQS { return; } + const nackCount = await this.#getNackCount(messageId); + + span.setAttribute("nack_count", nackCount); + + if (nackCount >= this.options.maximumNackCount) { + logger.debug(`[${this.name}].nackMessage() maximum nack count reached`, { + messageId, + retryAt, + updates, + service: this.name, + }); + + span.setAttribute("maximum_nack_count_reached", true); + + // If we have reached the maximum nack count, we will ack the message + await this.acknowledgeMessage(messageId, "maximum nack count reached"); + return; + } + span.setAttributes({ [SemanticAttributes.QUEUE]: message.queue, [SemanticAttributes.MESSAGE_ID]: message.messageId, @@ -687,7 +761,7 @@ export class MarQS { concurrencyKey: this.keys.currentConcurrencyKeyFromQueue(message.queue), envConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(message.queue), orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(message.queue), - visibilityQueue: constants.MESSAGE_VISIBILITY_TIMEOUT_QUEUE, + nackCounterKey: this.keys.nackCounterKey(messageId), messageId, messageScore: retryAt, }); @@ -705,6 +779,12 @@ export class MarQS { ); } + async #getNackCount(messageId: string): Promise { + const result = await this.redis.get(this.keys.nackCounterKey(messageId)); + + return result ? Number(result) : 0; + } + // This should increment by the number of seconds, but with a max value of Date.now() + visibilityTimeoutInMs public async heartbeatMessage(messageId: string) { await this.options.visibilityTimeoutStrategy.heartbeat(messageId, this.visibilityTimeoutInMs); @@ -749,82 +829,55 @@ export class MarQS { ); } - async #getRandomQueueFromParentQueue( + async #getPrioritizedQueueCandidates( parentQueue: string, + childQueues: Array<{ value: string; score: number }>, queuePriorityStrategy: MarQSQueuePriorityStrategy, calculateCapacities: (queue: string) => Promise, consumerId: string ) { return this.#trace( - "getRandomQueueFromParentQueue", + "getPrioritizedQueueCandidates", async (span) => { - span.setAttribute("consumerId", consumerId); + span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); + span.setAttribute(SemanticAttributes.PARENT_QUEUE, parentQueue); + + span.setAttribute("parent_queue_length", childQueues.length); const { range } = await queuePriorityStrategy.nextCandidateSelection( parentQueue, consumerId ); - const queues = await this.#getChildQueuesWithScores(parentQueue, range, span); - span.setAttribute("queueCount", queues.length); + span.setAttribute("range_offset", range.offset); + span.setAttribute("range_count", range.count); + + const queues = childQueues.slice(range.offset, range.offset + range.count); + + span.setAttribute("queue_count", queues.length); const queuesWithScores = await this.#calculateQueueScores(queues, calculateCapacities); - span.setAttribute("queuesWithScoresCount", queuesWithScores.length); - // We need to priority shuffle here to ensure all workers aren't just working on the highest priority queue - const { choice, nextRange } = this.queuePriorityStrategy.chooseQueue( - queuesWithScores, + span.setAttribute("queues_with_scores_count", queuesWithScores.length); + + // Get weighted distribution of queues instead of a single choice + const weightedQueues = queuePriorityStrategy.distributeQueues(queuesWithScores); + + const nextRange = queuePriorityStrategy.moveToNextRange( parentQueue, consumerId, - range + range, + childQueues.length ); - span.setAttributes({ - ...flattenAttributes(queues, "marqs.queues"), - }); - span.setAttributes({ - ...flattenAttributes(queuesWithScores, "marqs.queuesWithScores"), - }); - span.setAttribute("range.offset", range.offset); - span.setAttribute("range.count", range.count); - span.setAttribute("nextRange.offset", nextRange.offset); - span.setAttribute("nextRange.count", nextRange.count); - - if (this.options.verbose || nextRange.offset > 0) { - if (typeof choice === "string") { - logger.debug(`[${this.name}] getRandomQueueFromParentQueue`, { - queues, - queuesWithScores, - range, - nextRange, - queueCount: queues.length, - queuesWithScoresCount: queuesWithScores.length, - queueChoice: choice, - consumerId, - }); - } else { - logger.debug(`[${this.name}] getRandomQueueFromParentQueue`, { - queues, - queuesWithScores, - range, - nextRange, - queueCount: queues.length, - queuesWithScoresCount: queuesWithScores.length, - noQueueChoice: true, - consumerId, - }); - } - } - - if (typeof choice !== "string") { - span.setAttribute("noQueueChoice", true); - - return; - } else { - span.setAttribute("queueChoice", choice); + span.setAttribute("next_range_offset", nextRange.offset); - return choice; + // Next time we dequeue we will re-fetch the queues of the parent for this consumer + if (nextRange.offset === 0) { + this.#evictConsumerQueues(parentQueue, consumerId); } + + return weightedQueues; }, { kind: SpanKind.CONSUMER, @@ -841,67 +894,176 @@ export class MarQS { async #calculateQueueScores( queues: Array<{ value: string; score: number }>, calculateCapacities: (queue: string) => Promise - ) { - const now = Date.now(); + ): Promise> { + return await this.#trace("calculateQueueScores", async (span) => { + const now = Date.now(); + + const values = await Promise.all( + queues.map(async (queue) => { + return { + queue: queue.value, + capacities: await calculateCapacities(queue.value), + age: now - queue.score, + size: await this.#getQueueSize(queue.value), + }; + }) + ); + + return values; + }); + } + + async #getQueueSize(queue: string) { + const result = await this._cache.childQueueSize.swr(queue, async () => { + return await this.redis.zcard(queue); + }); - const queueScores = await Promise.all( - queues.map(async (queue) => { + return result.val ?? 0; + } + + async #calculateMessageQueueCapacities( + queue: string, + options?: { checkForDisabled?: boolean } + ): Promise { + if (options?.checkForDisabled) { + const isDisabled = await this.#getConcurrencyDisabled(queue); + + if (isDisabled) { return { - queue: queue.value, - capacities: await calculateCapacities(queue.value), - age: now - queue.score, - size: await this.redis.zcard(queue.value), + queue: { current: 0, limit: 0 }, + env: { current: 0, limit: 0 }, + org: { current: 0, limit: 0 }, }; - }) - ); + } + } + + // Now we need to calculate the queue concurrency limits, using a cache + const [queueLimit, envLimit, orgLimit, currentConcurrencies] = await Promise.all([ + this.#getQueueConcurrencyLimit(queue), + this.#getEnvConcurrencyLimit(queue), + this.#getOrgConcurrencyLimit(queue), + this.#callCalculateQueueCurrentConcurrencies({ + currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(queue), + currentEnvConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(queue), + currentOrgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(queue), + }), + ]); - return queueScores; + return { + queue: { current: currentConcurrencies.queue, limit: queueLimit }, + env: { current: currentConcurrencies.env, limit: envLimit }, + org: { current: currentConcurrencies.org, limit: orgLimit }, + }; } - async #calculateMessageQueueCapacities(queue: string, options?: { checkForDisabled?: boolean }) { - return await this.#callCalculateMessageCapacities({ - currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(queue), - currentEnvConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(queue), - currentOrgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(queue), - concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(queue), - envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(queue), - orgConcurrencyLimitKey: this.keys.orgConcurrencyLimitKeyFromQueue(queue), - disabledConcurrencyLimitKey: options?.checkForDisabled - ? this.keys.disabledConcurrencyLimitKeyFromQueue(queue) - : undefined, + async #getConcurrencyDisabled(queue: string) { + const key = this.keys.disabledConcurrencyLimitKeyFromQueue(queue); + + const result = await this._cache.disabledConcurrency.swr(key, async () => { + const value = await this.redis.exists(key); + + return Boolean(value); }); + + return typeof result.val === "boolean" ? result.val : false; } - async #getChildQueuesWithScores( - key: string, - range: QueueRange, - span?: Span - ): Promise> { - const valuesWithScores = await this.redis.zrangebyscore( - key, - "-inf", - Date.now(), - "WITHSCORES", - "LIMIT", - range.offset, - range.count + async #getOrgConcurrencyLimit(queue: string) { + const key = this.keys.orgConcurrencyLimitKeyFromQueue(queue); + + const result = await this._cache.envAndOrgConcurrencyLimit.swr(key, async () => { + const value = await this.redis.get(key); + + if (!value) { + return this.options.defaultOrgConcurrency; + } + + return Number(value); + }); + + return result.val ?? this.options.defaultOrgConcurrency; + } + + async #getEnvConcurrencyLimit(queue: string) { + const key = this.keys.envConcurrencyLimitKeyFromQueue(queue); + + const result = await this._cache.envAndOrgConcurrencyLimit.swr(key, async () => { + const value = await this.redis.get(key); + + if (!value) { + return this.options.defaultEnvConcurrency; + } + + return Number(value); + }); + + return result.val ?? this.options.defaultEnvConcurrency; + } + + async #getQueueConcurrencyLimit(queue: string) { + const key = this.keys.concurrencyLimitKeyFromQueue(queue); + const defaultValue = Math.min( + this.options.defaultEnvConcurrency, + this.options.defaultOrgConcurrency ); - span?.setAttribute("zrangebyscore.valuesWithScores.rawLength", valuesWithScores.length); - span?.setAttributes({ - ...flattenAttributes(valuesWithScores, "zrangebyscore.valuesWithScores.rawValues"), + const result = await this._cache.queueConcurrencyLimit.swr(key, async () => { + const value = await this.redis.get(key); + + if (!value) { + return defaultValue; + } + + return Number(value); }); - const result: Array<{ value: string; score: number }> = []; + return result.val ?? defaultValue; + } - for (let i = 0; i < valuesWithScores.length; i += 2) { - result.push({ - value: valuesWithScores[i], - score: Number(valuesWithScores[i + 1]), - }); + #evictConsumerQueues(parentQueue: string, consumerId: string) { + this._consumerQueues.delete([parentQueue, consumerId].join(":")); + } + + async #allChildQueuesForConsumer( + key: string, + consumerId: string + ): Promise> { + const cachedQueues = this._consumerQueues.get([key, consumerId].join(":")); + + if (cachedQueues) { + return cachedQueues; } - return result; + return await this.#trace("allChildQueuesForConsumer", async (span) => { + span.setAttribute(SemanticAttributes.CONSUMER_ID, consumerId); + span.setAttribute(SemanticAttributes.PARENT_QUEUE, key); + + const valuesWithScores = await this.redis.zrangebyscore( + key, + "-inf", + Date.now(), + "WITHSCORES" + ); + + const result: Array<{ value: string; score: number }> = []; + + for (let i = 0; i < valuesWithScores.length; i += 2) { + result.push({ + value: valuesWithScores[i], + score: Number(valuesWithScores[i + 1]), + }); + } + + span.setAttribute("queue_count", result.length); + + if (result.length === 0) { + return result; + } + + this._consumerQueues.set([key, consumerId].join(":"), result); + + return result; + }); } #startRebalanceWorkers() { @@ -1093,7 +1255,6 @@ export class MarQS { async #callDequeueMessage({ messageQueue, parentQueue, - visibilityQueue, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, @@ -1103,7 +1264,6 @@ export class MarQS { }: { messageQueue: string; parentQueue: string; - visibilityQueue: string; concurrencyLimitKey: string; envConcurrencyLimitKey: string; orgConcurrencyLimitKey: string; @@ -1162,7 +1322,6 @@ export class MarQS { parentQueue, messageKey, messageQueue, - visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, @@ -1171,7 +1330,6 @@ export class MarQS { parentQueue: string; messageKey: string; messageQueue: string; - visibilityQueue: string; concurrencyKey: string; envConcurrencyKey: string; orgConcurrencyKey: string; @@ -1180,7 +1338,6 @@ export class MarQS { logger.debug("Calling acknowledgeMessage", { messageKey, messageQueue, - visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, @@ -1193,7 +1350,6 @@ export class MarQS { parentQueue, messageKey, messageQueue, - visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, @@ -1210,7 +1366,7 @@ export class MarQS { concurrencyKey, envConcurrencyKey, orgConcurrencyKey, - visibilityQueue, + nackCounterKey, messageId, messageScore, }: { @@ -1220,7 +1376,7 @@ export class MarQS { concurrencyKey: string; envConcurrencyKey: string; orgConcurrencyKey: string; - visibilityQueue: string; + nackCounterKey: string; messageId: string; messageScore: number; }) { @@ -1231,7 +1387,7 @@ export class MarQS { concurrencyKey, envConcurrencyKey, orgConcurrencyKey, - visibilityQueue, + nackCounterKey, messageId, messageScore, service: this.name, @@ -1244,8 +1400,8 @@ export class MarQS { concurrencyKey, envConcurrencyKey, orgConcurrencyKey, - visibilityQueue, this.keys.envQueueKeyFromQueue(messageQueue), + nackCounterKey, messageQueue, messageId, String(Date.now()), @@ -1253,58 +1409,29 @@ export class MarQS { ); } - async #callCalculateMessageCapacities({ + async #callCalculateQueueCurrentConcurrencies({ currentConcurrencyKey, currentEnvConcurrencyKey, currentOrgConcurrencyKey, - concurrencyLimitKey, - envConcurrencyLimitKey, - orgConcurrencyLimitKey, - disabledConcurrencyLimitKey, }: { currentConcurrencyKey: string; currentEnvConcurrencyKey: string; currentOrgConcurrencyKey: string; - concurrencyLimitKey: string; - envConcurrencyLimitKey: string; - orgConcurrencyLimitKey: string; - disabledConcurrencyLimitKey: string | undefined; - }): Promise { - const capacities = disabledConcurrencyLimitKey - ? await this.redis.calculateMessageQueueCapacitiesWithDisabling( - currentConcurrencyKey, - currentEnvConcurrencyKey, - currentOrgConcurrencyKey, - concurrencyLimitKey, - envConcurrencyLimitKey, - orgConcurrencyLimitKey, - disabledConcurrencyLimitKey, - String(this.options.defaultEnvConcurrency), - String(this.options.defaultOrgConcurrency) - ) - : await this.redis.calculateMessageQueueCapacities( - currentConcurrencyKey, - currentEnvConcurrencyKey, - currentOrgConcurrencyKey, - concurrencyLimitKey, - envConcurrencyLimitKey, - orgConcurrencyLimitKey, - String(this.options.defaultEnvConcurrency), - String(this.options.defaultOrgConcurrency) - ); + }) { + const currentConcurrencies = await this.redis.calculateQueueCurrentConcurrencies( + currentConcurrencyKey, + currentEnvConcurrencyKey, + currentOrgConcurrencyKey + ); - const queueCurrent = Number(capacities[0]); - const envLimit = Number(capacities[3]); - const orgLimit = Number(capacities[5]); - const queueLimit = capacities[1] ? Number(capacities[1]) : Math.min(envLimit, orgLimit); - const envCurrent = Number(capacities[2]); - const orgCurrent = Number(capacities[4]); + const orgCurrent = Number(currentConcurrencies[0]); + const envCurrent = Number(currentConcurrencies[1]); + const queueCurrent = Number(currentConcurrencies[2]); - // [queue current, queue limit, env current, env limit, org current, org limit] return { - queue: { current: queueCurrent, limit: queueLimit }, - env: { current: envCurrent, limit: envLimit }, - org: { current: orgCurrent, limit: orgLimit }, + queue: queueCurrent, + env: envCurrent, + org: orgCurrent, }; } @@ -1492,17 +1619,16 @@ redis.call('SET', messageKey, messageData, 'GET') }); this.redis.defineCommand("acknowledgeMessage", { - numberOfKeys: 8, + numberOfKeys: 7, lua: ` --- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey +-- Keys: parentQueue, messageKey, messageQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey local parentQueue = KEYS[1] local messageKey = KEYS[2] local messageQueue = KEYS[3] -local visibilityQueue = KEYS[4] -local concurrencyKey = KEYS[5] -local envCurrentConcurrencyKey = KEYS[6] -local orgCurrentConcurrencyKey = KEYS[7] -local envQueueKey = KEYS[8] +local concurrencyKey = KEYS[4] +local envCurrentConcurrencyKey = KEYS[5] +local orgCurrentConcurrencyKey = KEYS[6] +local envQueueKey = KEYS[7] -- Args: messageId, messageQueueName local messageId = ARGV[1] @@ -1525,9 +1651,6 @@ else redis.call('ZADD', parentQueue, earliestMessage[2], messageQueueName) end --- Remove the message from the timeout queue (deprecated, will eventually remove this) -redis.call('ZREM', visibilityQueue, messageId) - -- Update the concurrency keys redis.call('SREM', concurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) @@ -1538,15 +1661,14 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId) this.redis.defineCommand("nackMessage", { numberOfKeys: 8, lua: ` --- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId local messageKey = KEYS[1] local childQueueKey = KEYS[2] local parentQueueKey = KEYS[3] local concurrencyKey = KEYS[4] local envConcurrencyKey = KEYS[5] local orgConcurrencyKey = KEYS[6] -local visibilityQueue = KEYS[7] -local envQueueKey = KEYS[8] +local envQueueKey = KEYS[7] +local nackCounterKey = KEYS[8] -- Args: childQueueName, messageId, currentTime, messageScore local childQueueName = ARGV[1] @@ -1559,20 +1681,16 @@ redis.call('SREM', concurrencyKey, messageId) redis.call('SREM', envConcurrencyKey, messageId) redis.call('SREM', orgConcurrencyKey, messageId) --- Check to see if the message is still in the visibilityQueue -local messageVisibility = tonumber(redis.call('ZSCORE', visibilityQueue, messageId)) or 0 - -if messageVisibility > 0 then --- Remove the message from the timeout queue (deprecated, will eventually remove this) - redis.call('ZREM', visibilityQueue, messageId) -end - -- Enqueue the message into the queue redis.call('ZADD', childQueueKey, messageScore, messageId) -- Enqueue the message into the env queue redis.call('ZADD', envQueueKey, messageScore, messageId) +-- Increment the nack counter with an expiry of 30 days +redis.call('INCR', nackCounterKey) +redis.call('EXPIRE', nackCounterKey, 2592000) + -- Rebalance the parent queue local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then @@ -1601,99 +1719,21 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId) `, }); - this.redis.defineCommand("heartbeatMessage", { - numberOfKeys: 1, - lua: ` --- Keys: visibilityQueue -local visibilityQueue = KEYS[1] - --- Args: messageId, milliseconds, maxVisibilityTimeout -local messageId = ARGV[1] -local milliseconds = tonumber(ARGV[2]) -local maxVisibilityTimeout = tonumber(ARGV[3]) - --- Get the current visibility timeout -local zscoreResult = redis.call('ZSCORE', visibilityQueue, messageId) - --- If there's no currentVisibilityTimeout, return and do not execute ZADD -if zscoreResult == false then - return -end - -local currentVisibilityTimeout = tonumber(zscoreResult) - - --- Calculate the new visibility timeout -local newVisibilityTimeout = math.min(currentVisibilityTimeout + milliseconds * 1000, maxVisibilityTimeout) - --- Update the visibility timeout -redis.call('ZADD', visibilityQueue, newVisibilityTimeout, messageId) - `, - }); - - this.redis.defineCommand("calculateMessageQueueCapacitiesWithDisabling", { - numberOfKeys: 7, - lua: ` --- Keys: currentConcurrencyKey, currentEnvConcurrencyKey, currentOrgConcurrencyKey, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, disabledConcurrencyLimitKey -local currentConcurrencyKey = KEYS[1] -local currentEnvConcurrencyKey = KEYS[2] -local currentOrgConcurrencyKey = KEYS[3] -local concurrencyLimitKey = KEYS[4] -local envConcurrencyLimitKey = KEYS[5] -local orgConcurrencyLimitKey = KEYS[6] -local disabledConcurrencyLimitKey = KEYS[7] - --- Args defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit -local defaultEnvConcurrencyLimit = tonumber(ARGV[1]) -local defaultOrgConcurrencyLimit = tonumber(ARGV[2]) - -local currentOrgConcurrency = tonumber(redis.call('SCARD', currentOrgConcurrencyKey) or '0') - --- Check if disabledConcurrencyLimitKey exists -local orgConcurrencyLimit -if redis.call('EXISTS', disabledConcurrencyLimitKey) == 1 then - orgConcurrencyLimit = 0 -else - orgConcurrencyLimit = tonumber(redis.call('GET', orgConcurrencyLimitKey) or defaultOrgConcurrencyLimit) -end - -local currentEnvConcurrency = tonumber(redis.call('SCARD', currentEnvConcurrencyKey) or '0') -local envConcurrencyLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) - -local currentConcurrency = tonumber(redis.call('SCARD', currentConcurrencyKey) or '0') -local concurrencyLimit = redis.call('GET', concurrencyLimitKey) - --- Return current capacity and concurrency limits for the queue, env, org -return { currentConcurrency, concurrencyLimit, currentEnvConcurrency, envConcurrencyLimit, currentOrgConcurrency, orgConcurrencyLimit } - `, - }); - - this.redis.defineCommand("calculateMessageQueueCapacities", { - numberOfKeys: 6, + this.redis.defineCommand("calculateQueueCurrentConcurrencies", { + numberOfKeys: 3, lua: ` --- Keys: currentConcurrencyKey, currentEnvConcurrencyKey, currentOrgConcurrencyKey, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey +-- Keys: currentConcurrencyKey, currentEnvConcurrencyKey, currentOrgConcurrencyKey local currentConcurrencyKey = KEYS[1] local currentEnvConcurrencyKey = KEYS[2] local currentOrgConcurrencyKey = KEYS[3] -local concurrencyLimitKey = KEYS[4] -local envConcurrencyLimitKey = KEYS[5] -local orgConcurrencyLimitKey = KEYS[6] - --- Args defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit -local defaultEnvConcurrencyLimit = tonumber(ARGV[1]) -local defaultOrgConcurrencyLimit = tonumber(ARGV[2]) local currentOrgConcurrency = tonumber(redis.call('SCARD', currentOrgConcurrencyKey) or '0') -local orgConcurrencyLimit = tonumber(redis.call('GET', orgConcurrencyLimitKey) or defaultOrgConcurrencyLimit) local currentEnvConcurrency = tonumber(redis.call('SCARD', currentEnvConcurrencyKey) or '0') -local envConcurrencyLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) local currentConcurrency = tonumber(redis.call('SCARD', currentConcurrencyKey) or '0') -local concurrencyLimit = redis.call('GET', concurrencyLimitKey) --- Return current capacity and concurrency limits for the queue, env, org -return { currentConcurrency, concurrencyLimit, currentEnvConcurrency, envConcurrencyLimit, currentOrgConcurrency, orgConcurrencyLimit } +return { currentOrgConcurrency, currentEnvConcurrency, currentConcurrency } `, }); @@ -1790,7 +1830,6 @@ declare module "ioredis" { parentQueue: string, messageKey: string, messageQueue: string, - visibilityQueue: string, concurrencyKey: string, envConcurrencyKey: string, orgConcurrencyKey: string, @@ -1807,8 +1846,8 @@ declare module "ioredis" { concurrencyKey: string, envConcurrencyKey: string, orgConcurrencyKey: string, - visibilityQueue: string, envQueueKey: string, + nackCounterKey: string, childQueueName: string, messageId: string, currentTime: string, @@ -1824,39 +1863,6 @@ declare module "ioredis" { callback?: Callback ): Result; - heartbeatMessage( - visibilityQueue: string, - messageId: string, - milliseconds: string, - maxVisibilityTimeout: string, - callback?: Callback - ): Result; - - calculateMessageQueueCapacities( - currentConcurrencyKey: string, - currentEnvConcurrencyKey: string, - currentOrgConcurrencyKey: string, - concurrencyLimitKey: string, - envConcurrencyLimitKey: string, - orgConcurrencyLimitKey: string, - defaultEnvConcurrencyLimit: string, - defaultOrgConcurrencyLimit: string, - callback?: Callback - ): Result; - - calculateMessageQueueCapacitiesWithDisabling( - currentConcurrencyKey: string, - currentEnvConcurrencyKey: string, - currentOrgConcurrencyKey: string, - concurrencyLimitKey: string, - envConcurrencyLimitKey: string, - orgConcurrencyLimitKey: string, - disabledConcurrencyLimitKey: string, - defaultEnvConcurrencyLimit: string, - defaultOrgConcurrencyLimit: string, - callback?: Callback - ): Result; - updateGlobalConcurrencyLimits( envConcurrencyLimitKey: string, orgConcurrencyLimitKey: string, @@ -1872,6 +1878,13 @@ declare module "ioredis" { currentScore: string, callback?: Callback ): Result; + + calculateQueueCurrentConcurrencies( + currentConcurrencyKey: string, + currentEnvConcurrencyKey: string, + currentOrgConcurrencyKey: string, + callback?: Callback + ): Result; } } @@ -1895,14 +1908,19 @@ function getMarQSClient() { tracer: trace.getTracer("marqs"), keysProducer: new MarQSShortKeyProducer(KEY_PREFIX), visibilityTimeoutStrategy: new V3VisibilityTimeout(), - queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 36 }), - envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ queueSelectionCount: 12 }), + queuePriorityStrategy: new SimpleWeightedChoiceStrategy({ + queueSelectionCount: env.MARQS_SHARED_QUEUE_SELECTION_COUNT, + }), + envQueuePriorityStrategy: new SimpleWeightedChoiceStrategy({ + queueSelectionCount: env.MARQS_DEV_QUEUE_SELECTION_COUNT, + }), workers: 1, redis: redisOptions, defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT, defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, visibilityTimeoutInMs: env.MARQS_VISIBILITY_TIMEOUT_MS, enableRebalancing: !env.MARQS_DISABLE_REBALANCING, + maximumNackCount: env.MARQS_MAXIMUM_NACK_COUNT, subscriber: concurrencyTracker, }); } else { @@ -1912,8 +1930,3 @@ function getMarQSClient() { } } } - -// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters) -export function sanitizeQueueName(queueName: string) { - return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128); -} diff --git a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts index 130cae1e14..0a6dd0ecb8 100644 --- a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts +++ b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts @@ -142,6 +142,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return `${constants.MESSAGE_PART}:${messageId}`; } + nackCounterKey(messageId: string): string { + return `${constants.MESSAGE_PART}:${messageId}:nacks`; + } + private shortId(id: string) { // Return the last 12 characters of the id return id.slice(-12); diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index b0cff44b3c..d2dc99a249 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -9,9 +9,11 @@ import { trace, } from "@opentelemetry/api"; import { + AckCallbackResult, MachinePreset, ProdTaskRunExecution, ProdTaskRunExecutionPayload, + QueueOptions, TaskRunError, TaskRunErrorCodes, TaskRunExecution, @@ -27,7 +29,7 @@ import { BackgroundWorker, BackgroundWorkerTask, Prisma, - RuntimeEnvironment, + TaskQueue, TaskRunStatus, } from "@trigger.dev/database"; import { z } from "zod"; @@ -37,8 +39,12 @@ import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; -import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; -import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server"; +import { marqs } from "~/v3/marqs/index.server"; +import { + RuntimeEnvironmentForEnvRepo, + RuntimeEnvironmentForEnvRepoPayload, + resolveVariablesForEnvironment, +} from "../environmentVariables/environmentVariablesRepository.server"; import { EnvironmentVariable } from "../environmentVariables/repository"; import { FailedTaskRunService } from "../failedTaskRun.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; @@ -61,6 +67,7 @@ import { import { tracer } from "../tracer.server"; import { getMaxDuration } from "../utils/maxDuration"; import { MessagePayload } from "./types"; +import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; const WithTraceContext = z.object({ traceparent: z.string().optional(), @@ -122,15 +129,14 @@ export type SharedQueueConsumerOptions = { type HandleMessageAction = "ack_and_do_more_work" | "nack" | "nack_and_do_more_work" | "noop"; -type DoWorkInternalResult = - | { - reason: string; - attrs?: Record; - error?: Error | string; - interval?: number; - action?: HandleMessageAction; - } - | undefined; +type DoWorkInternalResult = { + reason: string; + outcome: "execution" | "retry_with_nack" | "fail_with_ack" | "noop"; + attrs?: Record; + error?: Error | string; + interval?: number; + action?: HandleMessageAction; +}; type HandleMessageResult = { action: HandleMessageAction; @@ -151,6 +157,7 @@ export class SharedQueueConsumer { private _currentSpanContext: Context | undefined; private _reasonStats: Record = {}; private _actionStats: Record = {}; + private _outcomeStats: Record = {}; private _currentSpan: Span | undefined; private _endSpanInNextIteration = false; private _tasks = sharedQueueTasks; @@ -246,6 +253,7 @@ export class SharedQueueConsumer { this._traceStartedAt = new Date(); this._reasonStats = {}; this._actionStats = {}; + this._outcomeStats = {}; this.#doWork().finally(() => {}); } @@ -260,6 +268,10 @@ export class SharedQueueConsumer { this._currentSpan.setAttribute(`actions_${action}`, count); } + for (const [outcome, count] of Object.entries(this._outcomeStats)) { + this._currentSpan.setAttribute(`outcomes_${outcome}`, count); + } + this._currentSpan.end(); } } @@ -318,6 +330,7 @@ export class SharedQueueConsumer { this._traceStartedAt = new Date(); this._reasonStats = {}; this._actionStats = {}; + this._outcomeStats = {}; this._iterationsCount = 0; this._runningDurationInMs = 0; this._endSpanInNextIteration = false; @@ -338,39 +351,33 @@ export class SharedQueueConsumer { try { const result = await this.#doWorkInternal(); - if (result) { - this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1; + this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1; + this._outcomeStats[result.outcome] = (this._outcomeStats[result.outcome] ?? 0) + 1; - if (result.action) { - this._actionStats[result.action] = (this._actionStats[result.action] ?? 0) + 1; - } + if (result.action) { + this._actionStats[result.action] = (this._actionStats[result.action] ?? 0) + 1; + } - span.setAttribute("reason", result.reason); + span.setAttribute("reason", result.reason); - if (result.attrs) { - for (const [key, value] of Object.entries(result.attrs)) { - if (value) { - span.setAttribute(key, value); - } + if (result.attrs) { + for (const [key, value] of Object.entries(result.attrs)) { + if (value) { + span.setAttribute(key, value); } } + } - if (result.error) { - span.recordException(result.error); - span.setStatus({ code: SpanStatusCode.ERROR }); - } - - if (typeof result.interval === "number") { - nextInterval = Math.max(result.interval, 0); // Cannot be negative - } - - span.setAttribute("nextInterval", nextInterval); - } else { - span.setAttribute("reason", "no_result"); + if (result.error) { + span.recordException(result.error); + span.setStatus({ code: SpanStatusCode.ERROR }); + } - this._reasonStats["no_result"] = (this._reasonStats["no_result"] ?? 0) + 1; - this._actionStats["no_result"] = (this._actionStats["no_result"] ?? 0) + 1; + if (typeof result.interval === "number") { + nextInterval = Math.max(result.interval, 0); // Cannot be negative } + + span.setAttribute("nextInterval", nextInterval); } catch (error) { if (error instanceof Error) { this._currentSpan?.recordException(error); @@ -412,7 +419,11 @@ export class SharedQueueConsumer { const message = await marqs?.dequeueMessageInSharedQueue(this._id); if (!message) { - return { reason: "no_message_dequeued", interval: this._options.nextTickInterval }; + return { + reason: "no_message_dequeued", + outcome: "noop", + interval: this._options.nextTickInterval, + }; } logger.log("dequeueMessageInSharedQueue()", { queueMessage: message }); @@ -429,6 +440,7 @@ export class SharedQueueConsumer { return { reason: "failed_to_parse_message", + outcome: "fail_with_ack", attrs: { message_id: message.messageId, message_version: message.version }, error: messageBody.error, }; @@ -453,6 +465,7 @@ export class SharedQueueConsumer { case "noop": { return { reason: messageResult.reason ?? "none_specified", + outcome: "execution", attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, interval: messageResult.interval, @@ -464,6 +477,7 @@ export class SharedQueueConsumer { return { reason: messageResult.reason ?? "none_specified", + outcome: "fail_with_ack", attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, interval: messageResult.interval, @@ -475,6 +489,7 @@ export class SharedQueueConsumer { return { reason: messageResult.reason ?? "none_specified", + outcome: "retry_with_nack", attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, interval: messageResult.interval, @@ -486,6 +501,7 @@ export class SharedQueueConsumer { return { reason: messageResult.reason ?? "none_specified", + outcome: "retry_with_nack", attrs: hydrateAttributes(messageResult.attrs ?? {}), error: messageResult.error, action: "nack", @@ -731,12 +747,12 @@ export class SharedQueueConsumer { }; } - const queue = await prisma.taskQueue.findFirst({ - where: { - runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId, - name: sanitizeQueueName(lockedTaskRun.queue), - }, - }); + const queue = await findQueueInEnvironment( + lockedTaskRun.queue, + lockedTaskRun.runtimeEnvironmentId, + lockedTaskRun.lockedById ?? undefined, + backgroundTask + ); if (!queue) { logger.debug("SharedQueueConsumer queue not found, so nacking message", { @@ -746,7 +762,7 @@ export class SharedQueueConsumer { }); return { - action: "nack_and_do_more_work", + action: "ack_and_do_more_work", reason: "queue_not_found", attrs: { queue_name: sanitizeQueueName(lockedTaskRun.queue), @@ -1018,12 +1034,11 @@ export class SharedQueueConsumer { }; } - const queue = await prisma.taskQueue.findFirst({ - where: { - runtimeEnvironmentId: resumableAttempt.runtimeEnvironmentId, - name: sanitizeQueueName(resumableRun.queue), - }, - }); + const queue = await findQueueInEnvironment( + resumableRun.queue, + resumableRun.runtimeEnvironmentId, + resumableRun.lockedById ?? undefined + ); if (!queue) { logger.debug("SharedQueueConsumer queue not found, so nacking message", { @@ -1032,7 +1047,7 @@ export class SharedQueueConsumer { }); return { - action: "nack_and_do_more_work", + action: "ack_and_do_more_work", reason: "queue_not_found", attrs: { queue_name: sanitizeQueueName(resumableRun.queue), @@ -1051,81 +1066,11 @@ export class SharedQueueConsumer { }; } - const completions: TaskRunExecutionResult[] = []; - const executions: TaskRunExecution[] = []; - - for (const completedAttemptId of data.completedAttemptIds) { - const completedAttempt = await prisma.taskRunAttempt.findFirst({ - where: { - id: completedAttemptId, - taskRun: { - lockedAt: { - not: null, - }, - lockedById: { - not: null, - }, - }, - }, - }); - - if (!completedAttempt) { - logger.error("Completed attempt not found", { - queueMessage: message.data, - messageId: message.messageId, - }); - - return { - action: "ack_and_do_more_work", - reason: "completed_attempt_not_found", - attrs: { - completed_attempt_id: completedAttemptId, - }, - }; - } - - const completion = await this.#startActiveSpan( - "getCompletionPayloadFromAttempt", - async (span) => { - return await this._tasks.getCompletionPayloadFromAttempt(completedAttempt.id); - } - ); - - if (!completion) { - return { - action: "ack_and_do_more_work", - reason: "failed_to_get_completion_payload", - attrs: { - completed_attempt_id: completedAttemptId, - }, - }; - } - - completions.push(completion); - - const executionPayload = await this.#startActiveSpan( - "getExecutionPayloadFromAttempt", - async (span) => { - return await this._tasks.getExecutionPayloadFromAttempt({ - id: completedAttempt.id, - }); - } + try { + const { completions, executions } = await this.#resolveCompletedAttemptsForResumeMessage( + data.completedAttemptIds ); - if (!executionPayload) { - return { - action: "ack_and_do_more_work", - reason: "failed_to_get_execution_payload", - attrs: { - completed_attempt_id: completedAttemptId, - }, - }; - } - - executions.push(executionPayload.execution); - } - - try { const resumeMessage = { version: "v1" as const, runId: resumableAttempt.taskRunId, @@ -1144,10 +1089,54 @@ export class SharedQueueConsumer { // The attempt should still be running so we can broadcast to all coordinators to resume immediately const responses = await this.#startActiveSpan( "emitResumeAfterDependencyWithAck", - async () => { - return await socketIo.coordinatorNamespace - .timeout(10_000) - .emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage); + async (span) => { + try { + const sockets = await this.#startActiveSpan("getCoordinatorSockets", async (span) => { + const sockets = await socketIo.coordinatorNamespace.fetchSockets(); + + span.setAttribute("socket_count", sockets.length); + + return sockets; + }); + + span.setAttribute("socket_count", sockets.length); + span.setAttribute("attempt_id", resumableAttempt.id); + span.setAttribute( + "timeout_in_ms", + env.SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS + ); + + const responses = await socketIo.coordinatorNamespace + .timeout(env.SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS) + .emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage); + + span.setAttribute("response_count", responses.length); + + const hasSuccess = responses.some((response) => response.success); + + span.setAttribute("has_success", hasSuccess); + span.setAttribute("is_timeout", false); + + return responses; + } catch (e) { + if (e instanceof Error && "responses" in e && Array.isArray(e.responses)) { + span.setAttribute("is_timeout", false); + + const responses = e.responses as AckCallbackResult[]; + + span.setAttribute("response_count", responses.length); + + const hasSuccess = responses.some( + (response) => "success" in response && response.success + ); + + span.setAttribute("has_success", hasSuccess); + + return responses; + } + + throw e; + } } ); @@ -1278,6 +1267,32 @@ export class SharedQueueConsumer { retryInMs: 5_000, }; } catch (e) { + if (e instanceof ResumePayloadAttemptsNotFoundError) { + return { + action: "ack_and_do_more_work", + reason: "failed_to_get_resume_payloads_for_attempts", + attrs: { + attempt_ids: e.attemptIds.join(","), + }, + }; + } else if (e instanceof ResumePayloadExecutionNotFoundError) { + return { + action: "ack_and_do_more_work", + reason: "failed_to_get_resume_payloads_missing_execution", + attrs: { + attempt_id: e.attemptId, + }, + }; + } else if (e instanceof ResumePayloadCompletionNotFoundError) { + return { + action: "ack_and_do_more_work", + reason: "failed_to_get_resume_payloads_missing_completion", + attrs: { + attempt_id: e.attemptId, + }, + }; + } + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK threw, nacking with delay", { message, error: e, @@ -1392,6 +1407,57 @@ export class SharedQueueConsumer { }); } + async #resolveCompletedAttemptsForResumeMessage( + completedAttemptIds: string[] + ): Promise<{ completions: TaskRunExecutionResult[]; executions: TaskRunExecution[] }> { + return await this.#startActiveSpan("resolveCompletedAttemptsForResumeMessage", async (span) => { + span.setAttribute("completed_attempt_count", completedAttemptIds.length); + + // Chunk the completedAttemptIds into chunks of 10 + const chunkedCompletedAttemptIds = chunk( + completedAttemptIds, + env.SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE + ); + + span.setAttribute("chunk_count", chunkedCompletedAttemptIds.length); + span.setAttribute("chunk_size", env.SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE); + + const allResumePayloads = await this.#startActiveSpan( + "resolveAllResumePayloads", + async (span) => { + span.setAttribute("chunk_count", chunkedCompletedAttemptIds.length); + span.setAttribute("chunk_size", env.SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE); + span.setAttribute("completed_attempt_count", completedAttemptIds.length); + + return await Promise.all( + chunkedCompletedAttemptIds.map(async (attemptIds) => { + const payloads = await this.#startActiveSpan("getResumePayloads", async (span) => { + span.setAttribute("attempt_ids", attemptIds.join(",")); + span.setAttribute("attempt_count", attemptIds.length); + + const payloads = await this._tasks.getResumePayloads(attemptIds); + + span.setAttribute("payload_count", payloads.length); + + return payloads; + }); + + return { + completions: payloads.map((payload) => payload.completion), + executions: payloads.map((payload) => payload.execution), + }; + }) + ); + } + ); + + return { + completions: allResumePayloads.flatMap((payload) => payload.completions), + executions: allResumePayloads.flatMap((payload) => payload.executions), + }; + }); + } + async #startActiveSpan( name: string, fn: (span: Span) => Promise, @@ -1427,52 +1493,125 @@ export class SharedQueueConsumer { } } -type AttemptForCompletion = Prisma.TaskRunAttemptGetPayload<{ - include: { - backgroundWorker: true; - backgroundWorkerTask: true; +class ResumePayloadAttemptsNotFoundError extends Error { + constructor(public readonly attemptIds: string[]) { + super(`Resume payload attempts not found for attempts ${attemptIds.join(", ")}`); + } +} + +class ResumePayloadExecutionNotFoundError extends Error { + constructor(public readonly attemptId: string) { + super(`Resume payload execution not found for attempt ${attemptId}`); + } +} + +class ResumePayloadCompletionNotFoundError extends Error { + constructor(public readonly attemptId: string) { + super(`Resume payload completion not found for attempt ${attemptId}`); + } +} + +function chunk(arr: T[], chunkSize: number): T[][] { + return Array.from({ length: Math.ceil(arr.length / chunkSize) }, (_, i) => + arr.slice(i * chunkSize, i * chunkSize + chunkSize) + ); +} + +export const AttemptForCompletionGetPayload = { + select: { + status: true, + output: true, + outputType: true, + error: true, taskRun: { - include: { - runtimeEnvironment: { - include: { - organization: true; - project: true; - }; - }; - tags: true; - }; - }; - queue: true; - }; -}>; - -type AttemptForExecution = Prisma.TaskRunAttemptGetPayload<{ - include: { - backgroundWorker: true; - backgroundWorkerTask: true; + select: { + taskIdentifier: true, + friendlyId: true, + }, + }, + }, +} as const; + +type AttemptForCompletion = Prisma.TaskRunAttemptGetPayload; + +export const AttemptForExecutionGetPayload = { + select: { + id: true, + friendlyId: true, + taskRunId: true, + number: true, + startedAt: true, + createdAt: true, + backgroundWorkerId: true, + backgroundWorkerTaskId: true, + backgroundWorker: { + select: { + contentHash: true, + version: true, + }, + }, + backgroundWorkerTask: { + select: { + machineConfig: true, + slug: true, + filePath: true, + exportName: true, + }, + }, + status: true, runtimeEnvironment: { - include: { - organization: true; - project: true; - }; - }; + select: { + ...RuntimeEnvironmentForEnvRepoPayload.select, + organization: { + select: { + id: true, + slug: true, + title: true, + }, + }, + project: { + select: { + id: true, + externalRef: true, + slug: true, + name: true, + }, + }, + }, + }, taskRun: { - include: { - tags: true; - batchItems: { - include: { - batchTaskRun: { - select: { - friendlyId: true; - }; - }; - }; - }; - }; - }; - queue: true; - }; -}>; + select: { + id: true, + status: true, + traceContext: true, + machinePreset: true, + friendlyId: true, + payload: true, + payloadType: true, + context: true, + createdAt: true, + startedAt: true, + isTest: true, + metadata: true, + metadataType: true, + idempotencyKey: true, + usageDurationMs: true, + costInCents: true, + baseCostInCents: true, + maxDurationInSeconds: true, + tags: true, + }, + }, + queue: { + select: { + name: true, + friendlyId: true, + }, + }, + }, +} as const; + +type AttemptForExecution = Prisma.TaskRunAttemptGetPayload; class SharedQueueTasks { private _completionPayloadFromAttempt(attempt: AttemptForCompletion): TaskRunExecutionResult { @@ -1565,10 +1704,7 @@ class SharedQueueTasks { slug: attempt.runtimeEnvironment.project.slug, name: attempt.runtimeEnvironment.project.name, }, - batch: - taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun - ? { id: taskRun.batchItems[0].batchTaskRun.friendlyId } - : undefined, + batch: undefined, // TODO: Removing this for now until we can do it more efficiently worker: { id: attempt.backgroundWorkerId, contentHash: attempt.backgroundWorker.contentHash, @@ -1588,22 +1724,7 @@ class SharedQueueTasks { in: FINAL_ATTEMPT_STATUSES, }, }, - include: { - backgroundWorker: true, - backgroundWorkerTask: true, - taskRun: { - include: { - runtimeEnvironment: { - include: { - organization: true, - project: true, - }, - }, - tags: true, - }, - }, - queue: true, - }, + ...AttemptForCompletionGetPayload, }); if (!attempt) { @@ -1629,31 +1750,7 @@ class SharedQueueTasks { where: { id, }, - include: { - backgroundWorker: true, - backgroundWorkerTask: true, - runtimeEnvironment: { - include: { - organization: true, - project: true, - }, - }, - taskRun: { - include: { - tags: true, - batchItems: { - include: { - batchTaskRun: { - select: { - friendlyId: true, - }, - }, - }, - }, - }, - }, - queue: true, - }, + ...AttemptForExecutionGetPayload, }); if (!attempt) { @@ -1761,36 +1858,17 @@ class SharedQueueTasks { where: { id: attemptId, }, - include: { - backgroundWorker: true, - backgroundWorkerTask: true, - runtimeEnvironment: { - include: { - organization: true, - project: true, - }, - }, + select: { + ...AttemptForExecutionGetPayload.select, + error: true, + output: true, + outputType: true, taskRun: { - include: { - runtimeEnvironment: { - include: { - organization: true, - project: true, - }, - }, - tags: true, - batchItems: { - include: { - batchTaskRun: { - select: { - friendlyId: true, - }, - }, - }, - }, + select: { + ...AttemptForExecutionGetPayload.select.taskRun.select, + taskIdentifier: true, }, }, - queue: true, }, }); @@ -1808,6 +1886,62 @@ class SharedQueueTasks { }; } + async getResumePayloads(attemptIds: string[]): Promise< + Array<{ + execution: ProdTaskRunExecution; + completion: TaskRunExecutionResult; + }> + > { + const attempts = await prisma.taskRunAttempt.findMany({ + where: { + id: { + in: attemptIds, + }, + }, + select: { + ...AttemptForExecutionGetPayload.select, + error: true, + output: true, + outputType: true, + taskRun: { + select: { + ...AttemptForExecutionGetPayload.select.taskRun.select, + taskIdentifier: true, + }, + }, + }, + }); + + if (attempts.length !== attemptIds.length) { + logger.error("getResumePayloads: Not all attempts found", { attemptIds }); + + throw new ResumePayloadAttemptsNotFoundError(attemptIds); + } + + const payloads = await Promise.all( + attempts.map(async (attempt) => { + const execution = await this._executionFromAttempt(attempt); + + if (!execution) { + throw new ResumePayloadExecutionNotFoundError(attempt.id); + } + + const completion = this._completionPayloadFromAttempt(attempt); + + if (!completion) { + throw new ResumePayloadCompletionNotFoundError(attempt.id); + } + + return { + execution, + completion, + }; + }) + ); + + return payloads; + } + async getLatestExecutionPayloadFromRun( id: string, setToExecuting?: boolean, @@ -1993,7 +2127,7 @@ class SharedQueueTasks { } async #buildEnvironmentVariables( - environment: RuntimeEnvironment, + environment: RuntimeEnvironmentForEnvRepo, runId: string, machinePreset: MachinePreset ): Promise> { diff --git a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts index b53e984e95..525046c7b6 100644 --- a/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts +++ b/apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts @@ -26,41 +26,67 @@ export class SimpleWeightedChoiceStrategy implements MarQSQueuePriorityStrategy ); } - chooseQueue( - queues: QueueWithScores[], + moveToNextRange( parentQueue: string, consumerId: string, - previousRange: QueueRange - ): { choice: PriorityStrategyChoice; nextRange: QueueRange } { - const filteredQueues = filterQueuesAtCapacity(queues); - - if (queues.length === this.options.queueSelectionCount) { - const nextRange: QueueRange = { - offset: previousRange.offset + this.options.queueSelectionCount, - count: this.options.queueSelectionCount, - }; + currentRange: QueueRange, + parentQueueSize: number + ): QueueRange { + const nextRange: QueueRange = { + offset: currentRange.offset + currentRange.count, + count: currentRange.count, + }; - // If all queues are at capacity, and we were passed the max number of queues, then we will slide the window "to the right" + // if the nextRange is within the parentQueueSize, set it on the this._nextRangesByParentQueue map and return it + // if the nextRange is outside the parentQueueSize, reset the range to the beginning by deleting the entry from the map + if (nextRange.offset < parentQueueSize) { this._nextRangesByParentQueue.set(`${consumerId}:${parentQueue}`, nextRange); + return nextRange; } else { this._nextRangesByParentQueue.delete(`${consumerId}:${parentQueue}`); + return { offset: 0, count: this.options.queueSelectionCount }; } + } + + distributeQueues(queues: QueueWithScores[]): Array { + const filteredQueues = filterQueuesAtCapacity(queues); if (filteredQueues.length === 0) { - return { - choice: { abort: true }, - nextRange: this.nextRangeForParentQueue(parentQueue, consumerId), - }; + return []; } const queueWeights = this.#calculateQueueWeights(filteredQueues); - const choice = weightedRandomChoice(queueWeights); - - return { - choice, - nextRange: this.nextRangeForParentQueue(parentQueue, consumerId), - }; + // Sort queues by weight in descending order + const sortedQueues = [...queueWeights].sort((a, b) => b.totalWeight - a.totalWeight); + + // Convert weights to probabilities + const totalQueueWeight = sortedQueues.reduce((sum, queue) => sum + queue.totalWeight, 0); + const weightedQueues = sortedQueues.map(({ queue, totalWeight }) => ({ + queue, + probability: totalQueueWeight > 0 ? totalWeight / totalQueueWeight : 1.0, + })); + + // Apply some randomization while maintaining general weight order + // This helps prevent all consumers from always picking the same highest-weight queue + const shuffledWeightedQueues = weightedQueues + .map((queueInfo, index) => ({ + ...queueInfo, + // Add some controlled randomness while maintaining general weight order + randomFactor: Math.random() * 0.2 - 0.1, // ±10% random adjustment + originalIndex: index, + })) + .sort((a, b) => { + // If probability difference is significant (>20%), maintain order + if (Math.abs(a.probability - b.probability) > 0.2) { + return b.probability - a.probability; + } + // Otherwise, allow some randomization while keeping similar weights roughly together + return b.probability + b.randomFactor - (a.probability + a.randomFactor); + }) + .map(({ queue }) => queue); + + return shuffledWeightedQueues; } async nextCandidateSelection( @@ -104,32 +130,21 @@ function filterQueuesAtCapacity(queues: QueueWithScores[]) { ); } -function weightedRandomChoice(queues: Array<{ queue: string; totalWeight: number }>) { - const totalWeight = queues.reduce((acc, queue) => acc + queue.totalWeight, 0); - let randomNum = Math.random() * totalWeight; - - for (const queue of queues) { - if (randomNum < queue.totalWeight) { - return queue.queue; - } - - randomNum -= queue.totalWeight; +export class NoopWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { + nextCandidateSelection(parentQueue: string): Promise<{ range: QueueRange; selectionId: string }> { + return Promise.resolve({ range: { offset: 0, count: 0 }, selectionId: nanoid(24) }); } - // If we get here, we should just return a random queue - return queues[Math.floor(Math.random() * queues.length)].queue; -} - -export class NoopWeightedChoiceStrategy implements MarQSQueuePriorityStrategy { - chooseQueue( - queues: QueueWithScores[], - parentQueue: string, - selectionId: string - ): { choice: PriorityStrategyChoice; nextRange: QueueRange } { - return { choice: { abort: true }, nextRange: { offset: 0, count: 0 } }; + distributeQueues(queues: Array): Array { + return []; } - nextCandidateSelection(parentQueue: string): Promise<{ range: QueueRange; selectionId: string }> { - return Promise.resolve({ range: { offset: 0, count: 0 }, selectionId: nanoid(24) }); + moveToNextRange( + parentQueue: string, + consumerId: string, + currentRange: QueueRange, + queueSize: number + ): QueueRange { + return { offset: 0, count: 0 }; } } diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index 7605099b85..425a143773 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -47,28 +47,13 @@ export interface MarQSKeyProducer { envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; envQueueKeyFromQueue(queue: string): string; messageKey(messageId: string): string; + nackCounterKey(messageId: string): string; stripKeyPrefix(key: string): string; } export type PriorityStrategyChoice = string | { abort: true }; export interface MarQSQueuePriorityStrategy { - /** - * chooseQueue is called to select the next queue to process a message from - * - * @param queues - * @param parentQueue - * @param consumerId - * - * @returns The queue to process the message from, or an object with `abort: true` if no queue is available - */ - chooseQueue( - queues: Array, - parentQueue: string, - consumerId: string, - previousRange: QueueRange - ): { choice: PriorityStrategyChoice; nextRange: QueueRange }; - /** * This function is called to get the next candidate selection for the queue * The `range` is used to select the set of queues that will be considered for the next selection (passed to chooseQueue) @@ -80,6 +65,15 @@ export interface MarQSQueuePriorityStrategy { * @returns The scores and the selectionId for the next candidate selection */ nextCandidateSelection(parentQueue: string, consumerId: string): Promise<{ range: QueueRange }>; + + distributeQueues(queues: Array): Array; + + moveToNextRange( + parentQueue: string, + consumerId: string, + currentRange: QueueRange, + queueSize: number + ): QueueRange; } export const MessagePayload = z.object({ diff --git a/apps/webapp/app/v3/marqs/v2.server.ts b/apps/webapp/app/v3/marqs/v2.server.ts index c25e916d17..f0ca2c62e0 100644 --- a/apps/webapp/app/v3/marqs/v2.server.ts +++ b/apps/webapp/app/v3/marqs/v2.server.ts @@ -82,6 +82,7 @@ function getMarQSClient() { defaultEnvConcurrency: env.V2_MARQS_DEFAULT_ENV_CONCURRENCY, // this is so we aren't limited by the environment concurrency defaultOrgConcurrency: env.DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT, visibilityTimeoutInMs: env.V2_MARQS_VISIBILITY_TIMEOUT_MS, // 15 minutes + maximumNackCount: 10, enableRebalancing: false, }); } diff --git a/apps/webapp/app/v3/models/workerDeployment.server.ts b/apps/webapp/app/v3/models/workerDeployment.server.ts index 78d6304baa..096bab6372 100644 --- a/apps/webapp/app/v3/models/workerDeployment.server.ts +++ b/apps/webapp/app/v3/models/workerDeployment.server.ts @@ -44,6 +44,7 @@ type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{ triggerSource: true; machineConfig: true; maxDurationInSeconds: true; + queueConfig: true; }; }; }; diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index c4fc3a2f83..65b8cd3cc8 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -7,7 +7,7 @@ import type { BackgroundWorker } from "@trigger.dev/database"; import { Prisma, PrismaClientOrTransaction } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; -import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion"; import { BaseService } from "./baseService.server"; @@ -16,6 +16,7 @@ import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskSched import cronstrue from "cronstrue"; import { CheckScheduleService } from "./checkSchedule.server"; import { clampMaxDuration } from "../utils/maxDuration"; +import { sanitizeQueueName } from "~/models/taskQueue.server"; export class CreateBackgroundWorkerService extends BaseService { public async call( diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index 9bcc2b4f01..bdf8cf3781 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -10,6 +10,7 @@ import { machinePresetFromConfig, machinePresetFromRun } from "../machinePresets import { BaseService, ServiceValidationError } from "./baseService.server"; import { CrashTaskRunService } from "./crashTaskRun.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; +import { findQueueInEnvironment } from "~/models/taskQueue.server"; export class CreateTaskRunAttemptService extends BaseService { public async call({ @@ -95,18 +96,18 @@ export class CreateTaskRunAttemptService extends BaseService { throw new ServiceValidationError("Task run is cancelled", 400); } - if (!taskRun.lockedBy) { + const lockedBy = taskRun.lockedBy; + + if (!lockedBy) { throw new ServiceValidationError("Task run is not locked", 400); } - const queue = await this._prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: environment.id, - name: taskRun.queue, - }, - }, - }); + const queue = await findQueueInEnvironment( + taskRun.queue, + environment.id, + lockedBy.id, + lockedBy + ); if (!queue) { throw new ServiceValidationError("Queue not found", 404); @@ -121,7 +122,7 @@ export class CreateTaskRunAttemptService extends BaseService { if (nextAttemptNumber > MAX_TASK_RUN_ATTEMPTS) { const service = new CrashTaskRunService(this._prisma); await service.call(taskRun.id, { - reason: taskRun.lockedBy.worker.supportsLazyAttempts + reason: lockedBy.worker.supportsLazyAttempts ? "Max attempts reached." : "Max attempts reached. Please upgrade your CLI and SDK.", }); @@ -136,8 +137,8 @@ export class CreateTaskRunAttemptService extends BaseService { friendlyId: generateFriendlyId("attempt"), taskRunId: taskRun.id, startedAt: new Date(), - backgroundWorkerId: taskRun.lockedBy!.worker.id, - backgroundWorkerTaskId: taskRun.lockedBy!.id, + backgroundWorkerId: lockedBy.worker.id, + backgroundWorkerTaskId: lockedBy.id, status: setToExecuting ? "EXECUTING" : "PENDING", queueId: queue.id, runtimeEnvironmentId: environment.id, @@ -174,8 +175,7 @@ export class CreateTaskRunAttemptService extends BaseService { } const machinePreset = - machinePresetFromRun(taskRun) ?? - machinePresetFromConfig(taskRun.lockedBy.machineConfig ?? {}); + machinePresetFromRun(taskRun) ?? machinePresetFromConfig(lockedBy.machineConfig ?? {}); const metadata = await parsePacket({ data: taskRun.metadata ?? undefined, @@ -184,16 +184,16 @@ export class CreateTaskRunAttemptService extends BaseService { const execution: TaskRunExecution = { task: { - id: taskRun.lockedBy.slug, - filePath: taskRun.lockedBy.filePath, - exportName: taskRun.lockedBy.exportName, + id: lockedBy.slug, + filePath: lockedBy.filePath, + exportName: lockedBy.exportName, }, attempt: { id: taskRunAttempt.friendlyId, number: taskRunAttempt.number, startedAt: taskRunAttempt.startedAt ?? taskRunAttempt.createdAt, - backgroundWorkerId: taskRun.lockedBy.worker.id, - backgroundWorkerTaskId: taskRun.lockedBy.id, + backgroundWorkerId: lockedBy.worker.id, + backgroundWorkerTaskId: lockedBy.id, status: "EXECUTING" as const, }, run: { @@ -210,7 +210,7 @@ export class CreateTaskRunAttemptService extends BaseService { costInCents: taskRun.costInCents, baseCostInCents: taskRun.baseCostInCents, maxAttempts: taskRun.maxAttempts ?? undefined, - version: taskRun.lockedBy.worker.version, + version: lockedBy.worker.version, metadata, maxDuration: taskRun.maxDurationInSeconds ?? undefined, }, diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index b31546cb52..f4b5df5f78 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -1,7 +1,7 @@ import { FlushedRunMetadata, sanitizeError, TaskRunError } from "@trigger.dev/core/v3"; import { type Prisma, type TaskRun } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; -import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { FINAL_ATTEMPT_STATUSES, @@ -17,6 +17,7 @@ import { socketIo } from "../handleSocketIo.server"; import { ResumeBatchRunService } from "./resumeBatchRun.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; +import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; type BaseInput = { id: string; @@ -291,6 +292,7 @@ export class FinalizeTaskRunService extends BaseService { id: true, workerId: true, runtimeEnvironmentId: true, + queueConfig: true, }, where: { id: run.lockedById, @@ -302,14 +304,12 @@ export class FinalizeTaskRunService extends BaseService { return; } - const queue = await this._prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: workerTask.runtimeEnvironmentId, - name: sanitizeQueueName(run.queue), - }, - }, - }); + const queue = await findQueueInEnvironment( + run.queue, + workerTask.runtimeEnvironmentId, + workerTask.id, + workerTask + ); if (!queue) { logger.error("FinalizeTaskRunService: No queue found", { runId: run.id }); diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index c0d1cce189..98c4e21005 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -9,7 +9,7 @@ import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; import { workerQueue } from "~/services/worker.server"; -import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; +import { marqs } from "~/v3/marqs/index.server"; import { eventRepository } from "../eventRepository.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { uploadPacketToObjectStore } from "../r2.server"; @@ -27,6 +27,7 @@ import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; import { clampMaxDuration } from "../utils/maxDuration"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { Prisma } from "@trigger.dev/database"; +import { sanitizeQueueName } from "~/models/taskQueue.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 31d6dc3e4f..6147b86c3b 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -16,7 +16,7 @@ import { WaitReason, } from "./schemas.js"; -const ackCallbackResult = z.discriminatedUnion("success", [ +export const AckCallbackResult = z.discriminatedUnion("success", [ z.object({ success: z.literal(false), error: z.object({ @@ -31,6 +31,8 @@ const ackCallbackResult = z.discriminatedUnion("success", [ }), ]); +export type AckCallbackResult = z.infer; + export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [ z.object({ type: z.literal("CANCEL_ATTEMPT"), @@ -287,7 +289,7 @@ export const PlatformToProviderMessages = { projectId: z.string(), deploymentId: z.string(), }), - callback: ackCallbackResult, + callback: AckCallbackResult, }, RESTORE: { message: z.object({ @@ -530,7 +532,7 @@ export const PlatformToCoordinatorMessages = { completions: TaskRunExecutionResult.array(), executions: TaskRunExecution.array(), }), - callback: ackCallbackResult, + callback: AckCallbackResult, }, RESUME_AFTER_DURATION: { message: z.object({ diff --git a/references/v3-catalog/src/trigger/queues.ts b/references/v3-catalog/src/trigger/queues.ts new file mode 100644 index 0000000000..ec97993ab6 --- /dev/null +++ b/references/v3-catalog/src/trigger/queues.ts @@ -0,0 +1,42 @@ +import { logger, task, wait } from "@trigger.dev/sdk/v3"; + +export const queuesController = task({ + id: "queues/controller", + run: async ({ + numberOfQueues = 20, + length = 20, + waitSeconds = 3, + }: { + numberOfQueues?: number; + length?: number; + waitSeconds?: number; + }) => { + await queuesTest.batchTriggerAndWait( + Array.from({ length }, (_, i) => ({ + payload: { waitSeconds }, + options: { + queue: { + name: `queue-${i % numberOfQueues}`, + }, + }, + })) + ); + }, +}); + +export const queuesTest = task({ + id: "queues/test", + run: async (payload: { waitSeconds?: number }, { ctx }) => { + await wait.for({ seconds: payload.waitSeconds ?? 1 }); + }, +}); + +export const namedQueueTask = task({ + id: "queues/named-queue", + queue: { + name: "named-queue", + }, + run: async () => { + logger.info("named-queue"); + }, +});