Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import {
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
} from "./constants.server";
import { setInterval } from "node:timers/promises";
import { setInterval as setIntervalAsync } from "node:timers/promises";
import { tryCatch } from "@trigger.dev/core/utils";
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
import z from "zod";
Expand Down Expand Up @@ -78,6 +78,8 @@ export type MarQSOptions = {
subscriber?: MessageQueueSubscriber;
sharedWorkerQueueConsumerIntervalMs?: number;
sharedWorkerQueueMaxMessageCount?: number;
sharedWorkerQueueCooloffPeriodMs?: number;
sharedWorkerQueueCooloffCountThreshold?: number;
eagerDequeuingEnabled?: boolean;
Comment on lines +81 to 83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Public options added — document semantics and (optionally) wire to env/server config

Two new knobs look good. Please:

  • Document defaults and intent (e.g., “defaults: threshold min=10, period=10s”) in the type/README.
  • Consider wiring these to env so ops can tune without a deploy.

If you want them configurable now, append in getMarQSClient:

     sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
     sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
+    sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
+    sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,

Run to see if env.server.ts defines these keys:


🏁 Script executed:

#!/bin/bash
rg -nP 'MARQS_SHARED_WORKER_QUEUE_COOLOFF_(PERIOD_MS|COUNT_THRESHOLD)' apps/webapp/app/env.server.ts

Length of output: 102


🏁 Script executed:

#!/bin/bash
# Locate env.server.ts in the repository
echo "Locating env.server.ts files:"
fd -HI 'env.server.ts'

# If found, search for MARQS_SHARED_WORKER_QUEUE entries
ENV_FILE=$(fd -HI 'env.server.ts' | head -n 1)
if [ -n "$ENV_FILE" ] && [ -f "$ENV_FILE" ]; then
  echo -e "\nSearching for MARQS_SHARED_WORKER_QUEUE_ in $ENV_FILE:"
  rg -n 'MARQS_SHARED_WORKER_QUEUE_' -C3 "$ENV_FILE"
else
  echo "env.server.ts not found or path could not be resolved."
fi

Length of output: 1186


Add missing environment variables and document their defaults

The two new knobs (sharedWorkerQueueCooloffPeriodMs and sharedWorkerQueueCooloffCountThreshold) aren’t defined in your Zod‐backed env.server.ts, so they won’t be configurable via environment. Please:

  • In apps/webapp/app/env.server.ts, alongside the existing MARQS_SHARED_WORKER_QUEUE_* entries, add:

    MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS:          z.coerce.number().int().default(10000), // 10 s cool-off period
    MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD:   z.coerce.number().int().default(10),    // 10 messages before cool-off
  • In apps/webapp/app/v3/marqs/index.server.ts, wire them into getMarQSClient:

         sharedWorkerQueueConsumerIntervalMs:      env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
         sharedWorkerQueueMaxMessageCount:         env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
    +    sharedWorkerQueueCooloffPeriodMs:         env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
    +    sharedWorkerQueueCooloffCountThreshold:  env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,
  • Update the type/README to document each option’s intent and defaults (e.g., “defaults: cool-off period = 10000 ms; count threshold = 10 messages”).

🤖 Prompt for AI Agents
In apps/webapp/app/v3/marqs/index.server.ts around lines 81-83 the new options
sharedWorkerQueueCooloffPeriodMs and sharedWorkerQueueCooloffCountThreshold are
present but not exposed via environment; add two entries to
apps/webapp/app/env.server.ts named MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS
and MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD using
z.coerce.number().int() with defaults 10000 and 10 respectively, then read those
env keys and pass their values into getMarQSClient in
apps/webapp/app/v3/marqs/index.server.ts where the other
MARQS_SHARED_WORKER_QUEUE_* options are wired, and finally update the type
definitions/README to document both options and their defaults (cool-off period
= 10000 ms; count threshold = 10 messages).

workerOptions: {
pollIntervalMs?: number;
Expand Down Expand Up @@ -107,6 +109,9 @@ export class MarQS {
public keys: MarQSKeyProducer;
#rebalanceWorkers: Array<AsyncWorker> = [];
private worker: Worker<typeof workerCatalog>;
private queueDequeueCooloffPeriod: Map<string, number> = new Map();
private queueDequeueCooloffCounts: Map<string, number> = new Map();
private clearCooloffPeriodInterval: NodeJS.Timeout;

constructor(private readonly options: MarQSOptions) {
this.redis = options.redis;
Expand All @@ -116,6 +121,12 @@ export class MarQS {
this.#startRebalanceWorkers();
this.#registerCommands();

// This will prevent these cooloff maps from growing indefinitely
this.clearCooloffPeriodInterval = setInterval(() => {
this.queueDequeueCooloffCounts.clear();
this.queueDequeueCooloffPeriod.clear();
}, 60_000 * 10); // 10 minutes

this.worker = new Worker({
name: "marqs-worker",
redisOptions: options.workerOptions.redisOptions,
Expand Down Expand Up @@ -737,7 +748,7 @@ export class MarQS {
let processedCount = 0;

try {
for await (const _ of setInterval(
for await (const _ of setIntervalAsync(
this.options.sharedWorkerQueueConsumerIntervalMs ?? 500,
null,
{
Expand Down Expand Up @@ -821,6 +832,7 @@ export class MarQS {
let attemptedEnvs = 0;
let attemptedQueues = 0;
let messageCount = 0;
let coolOffPeriodCount = 0;

// Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues
for (const env of envQueues) {
Expand All @@ -829,6 +841,20 @@ export class MarQS {
for (const messageQueue of env.queues) {
attemptedQueues++;

const cooloffPeriod = this.queueDequeueCooloffPeriod.get(messageQueue);

// If the queue is in a cooloff period, skip attempting to dequeue from it
if (cooloffPeriod) {
// If the cooloff period is still active, skip attempting to dequeue from it
if (cooloffPeriod > Date.now()) {
coolOffPeriodCount++;
continue;
} else {
// If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue
this.queueDequeueCooloffPeriod.delete(messageQueue);
}
}

await this.#trace(
"attemptDequeue",
async (attemptDequeueSpan) => {
Expand Down Expand Up @@ -862,10 +888,32 @@ export class MarQS {
);

if (!messages || messages.length === 0) {
const cooloffCount = this.queueDequeueCooloffCounts.get(messageQueue) ?? 0;

const cooloffCountThreshold = Math.max(
10,
this.options.sharedWorkerQueueCooloffCountThreshold ?? 10
); // minimum of 10

if (cooloffCount >= cooloffCountThreshold) {
// If no messages were dequeued, set a cooloff period for the queue
// This is to prevent the queue from being dequeued too frequently
// and to give other queues a chance to dequeue messages more frequently
this.queueDequeueCooloffPeriod.set(
messageQueue,
Date.now() + (this.options.sharedWorkerQueueCooloffPeriodMs ?? 10_000) // defaults to 10 seconds
);
this.queueDequeueCooloffCounts.delete(messageQueue);
} else {
this.queueDequeueCooloffCounts.set(messageQueue, cooloffCount + 1);
}

attemptDequeueSpan.setAttribute("message_count", 0);
return null; // Try next queue if no message was dequeued
}

this.queueDequeueCooloffCounts.delete(messageQueue);

messageCount += messages.length;

attemptDequeueSpan.setAttribute("message_count", messages.length);
Expand Down Expand Up @@ -916,6 +964,7 @@ export class MarQS {
span.setAttribute("attempted_queues", attemptedQueues);
span.setAttribute("attempted_envs", attemptedEnvs);
span.setAttribute("message_count", messageCount);
span.setAttribute("cooloff_period_count", coolOffPeriodCount);

return;
},
Expand Down
Loading