diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index a4caf858ea..91a7370b79 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -384,6 +384,7 @@ const EnvironmentSchema = z.object({ .int() .default(60 * 1000 * 15), MARQS_SHARED_QUEUE_LIMIT: z.coerce.number().int().default(1000), + MARQS_MAXIMUM_QUEUE_PER_ENV_COUNT: z.coerce.number().int().default(50), MARQS_DEV_QUEUE_LIMIT: z.coerce.number().int().default(1000), MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64), MARQS_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75), diff --git a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts index 617e5e391d..b0b3fe89db 100644 --- a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts +++ b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts @@ -43,6 +43,11 @@ export type FairDequeuingStrategyOptions = { biases?: FairDequeuingStrategyBiases; reuseSnapshotCount?: number; maximumEnvCount?: number; + /** + * Maximum number of queues to process per environment + * If not provided, all queues in an environment will be processed + */ + maximumQueuePerEnvCount?: number; }; type FairQueueConcurrency = { @@ -216,8 +221,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { return result; } - // Helper method to maintain DRY principle - // Update return type #orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array { const queuesByEnv = snapshot.queues.reduce((acc, queue) => { if (!acc[queue.env]) { @@ -231,11 +234,17 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { if (queuesByEnv[envId]) { // Get ordered queues for this env const orderedQueues = this.#weightedRandomQueueOrder(queuesByEnv[envId]); + + // Apply queue limit if maximumQueuePerEnvCount is set + const limitedQueues = this.options.maximumQueuePerEnvCount + ? orderedQueues.slice(0, this.options.maximumQueuePerEnvCount) + : orderedQueues; + // Only add the env if it has queues - if (orderedQueues.length > 0) { + if (limitedQueues.length > 0) { acc.push({ envId, - queues: orderedQueues.map((queue) => queue.id), + queues: limitedQueues.map((queue) => queue.id), }); } } @@ -512,6 +521,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { span.setAttribute("queue_count", result.length); + if (result.length === this.options.parentQueueLimit) { + span.setAttribute("parent_queue_limit_reached", true); + } + return result; }); } diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 98e1996484..1636dba5f0 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -2189,6 +2189,7 @@ function getMarQSClient() { }, reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT, maximumEnvCount: env.MARQS_MAXIMUM_ENV_COUNT, + maximumQueuePerEnvCount: env.MARQS_MAXIMUM_QUEUE_PER_ENV_COUNT, }), envQueuePriorityStrategy: new FairDequeuingStrategy({ tracer: tracer, diff --git a/apps/webapp/test/fairDequeuingStrategy.test.ts b/apps/webapp/test/fairDequeuingStrategy.test.ts index 349ad2201b..63ccf3fc16 100644 --- a/apps/webapp/test/fairDequeuingStrategy.test.ts +++ b/apps/webapp/test/fairDequeuingStrategy.test.ts @@ -1032,6 +1032,431 @@ describe("FairDequeuingStrategy", () => { expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); } ); + + redisTest( + "should respect maximumQueuePerEnvCount when distributing queues", + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + seed: "test-seed-max-queues", + maximumQueuePerEnvCount: 2, // Only take 2 queues per env + }); + + const now = Date.now(); + + // Setup two environments with different numbers of queues + const envSetups = [ + { + envId: "env-1", + queues: [ + { age: 5000 }, // Oldest + { age: 4000 }, + { age: 3000 }, // This should be excluded due to maximumQueuePerEnvCount + ], + }, + { + envId: "env-2", + queues: [ + { age: 2000 }, + { age: 1000 }, // Newest + ], + }, + ]; + + // Setup queues and concurrency for each env + for (const setup of envSetups) { + await setupConcurrency({ + redis, + keyProducer, + env: { id: setup.envId, currentConcurrency: 0, limit: 5 }, + }); + + for (let i = 0; i < setup.queues.length; i++) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - setup.queues[i].age, + queueId: `queue-${setup.envId}-${i}`, + orgId: `org-${setup.envId}`, + envId: setup.envId, + }); + } + } + + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + // Verify that each environment has at most 2 queues + for (const envQueues of result) { + expect(envQueues.queues.length).toBeLessThanOrEqual(2); + } + + // Get queues for env-1 (which had 3 queues originally) + const env1Queues = result.find((eq) => eq.envId === "env-1")?.queues ?? []; + + // Should have exactly 2 queues + expect(env1Queues.length).toBe(2); + + // The queues should be the two oldest ones (queue-env-1-0 and queue-env-1-1) + expect(env1Queues).toContain(keyProducer.queueKey("org-env-1", "env-1", "queue-env-1-0")); + expect(env1Queues).toContain(keyProducer.queueKey("org-env-1", "env-1", "queue-env-1-1")); + expect(env1Queues).not.toContain(keyProducer.queueKey("org-env-1", "env-1", "queue-env-1-2")); + + // Get queues for env-2 (which had 2 queues originally) + const env2Queues = result.find((eq) => eq.envId === "env-2")?.queues ?? []; + + // Should still have both queues since it was within the limit + expect(env2Queues.length).toBe(2); + } + ); + + redisTest( + "should fairly distribute queues when using maximumQueuePerEnvCount over time", + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + seed: "test-seed-fair-distribution", + maximumQueuePerEnvCount: 2, // Only take 2 queues at a time + biases: { + concurrencyLimitBias: 0, + availableCapacityBias: 0, + queueAgeRandomization: 0.3, // Add some randomization to allow newer queues a chance + }, + }); + + const now = Date.now(); + + // Setup one environment with 5 queues of different ages + const queues = [ + { age: 5000, id: "queue-0" }, // Oldest + { age: 4000, id: "queue-1" }, + { age: 3000, id: "queue-2" }, + { age: 2000, id: "queue-3" }, + { age: 1000, id: "queue-4" }, // Newest + ]; + + // Setup the environment and its queues + await setupConcurrency({ + redis, + keyProducer, + env: { id: "env-1", currentConcurrency: 0, limit: 5 }, + }); + + for (const queue of queues) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - queue.age, + queueId: queue.id, + orgId: "org-1", + envId: "env-1", + }); + } + + // Run multiple iterations and track which queues are selected + const iterations = 1000; + const queueSelectionCounts: Record = {}; + const queuePairings: Record = {}; + + for (let i = 0; i < iterations; i++) { + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i}` + ); + + // There should be exactly one environment + expect(result.length).toBe(1); + const selectedQueues = result[0].queues; + + // Should always get exactly 2 queues due to maximumQueuePerEnvCount + expect(selectedQueues.length).toBe(2); + + // Track individual queue selections + for (const queueId of selectedQueues) { + const baseQueueId = queueId.split(":").pop()!; + queueSelectionCounts[baseQueueId] = (queueSelectionCounts[baseQueueId] || 0) + 1; + } + + // Track queue pairings to ensure variety + const [first, second] = selectedQueues.map((qId) => qId.split(":").pop()!).sort(); + const pairingKey = `${first}-${second}`; + queuePairings[pairingKey] = (queuePairings[pairingKey] || 0) + 1; + } + + console.log("\nQueue Selection Statistics:"); + for (const [queueId, count] of Object.entries(queueSelectionCounts)) { + const percentage = (count / (iterations * 2)) * 100; // Times 2 because we select 2 queues each time + console.log(`${queueId}: ${percentage.toFixed(2)}% (${count} times)`); + } + + console.log("\nQueue Pairing Statistics:"); + for (const [pair, count] of Object.entries(queuePairings)) { + const percentage = (count / iterations) * 100; + console.log(`${pair}: ${percentage.toFixed(2)}% (${count} times)`); + } + + // Verify that all queues were selected at least once + for (const queue of queues) { + expect(queueSelectionCounts[queue.id]).toBeGreaterThan(0); + } + + // Calculate standard deviation of selection percentages + const selectionPercentages = Object.values(queueSelectionCounts).map( + (count) => (count / (iterations * 2)) * 100 + ); + const stdDev = calculateStandardDeviation(selectionPercentages); + + // The standard deviation should be reasonable given our age bias + // Higher stdDev means more bias towards older queues + // We expect some bias due to queueAgeRandomization being 0.3 + expect(stdDev).toBeLessThan(15); // Allow for age-based bias but not extreme + + // Verify we get different pairings of queues + const uniquePairings = Object.keys(queuePairings).length; + // With 5 queues, we can have 10 possible unique pairs + expect(uniquePairings).toBeGreaterThan(5); // Should see at least half of possible combinations + } + ); + + redisTest( + "should handle maximumQueuePerEnvCount larger than available queues", + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + seed: "test-seed-max-larger", + maximumQueuePerEnvCount: 5, // Larger than the number of queues we'll create + }); + + const now = Date.now(); + + // Setup two environments with different numbers of queues + const envSetups = [ + { + envId: "env-1", + queues: [{ age: 5000 }, { age: 4000 }], + }, + { + envId: "env-2", + queues: [{ age: 3000 }], + }, + ]; + + // Setup queues and concurrency for each env + for (const setup of envSetups) { + await setupConcurrency({ + redis, + keyProducer, + env: { id: setup.envId, currentConcurrency: 0, limit: 5 }, + }); + + for (let i = 0; i < setup.queues.length; i++) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - setup.queues[i].age, + queueId: `queue-${setup.envId}-${i}`, + orgId: `org-${setup.envId}`, + envId: setup.envId, + }); + } + } + + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + // Should get all queues from both environments + const env1Queues = result.find((eq) => eq.envId === "env-1")?.queues ?? []; + const env2Queues = result.find((eq) => eq.envId === "env-2")?.queues ?? []; + + // env-1 should have both its queues + expect(env1Queues.length).toBe(2); + // env-2 should have its single queue + expect(env2Queues.length).toBe(1); + } + ); + + redisTest( + "should handle empty environments with maximumQueuePerEnvCount", + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + seed: "test-seed-empty-env", + maximumQueuePerEnvCount: 2, + }); + + const now = Date.now(); + + // Setup two environments, one with queues, one without + await setupConcurrency({ + redis, + keyProducer, + env: { id: "env-1", currentConcurrency: 0, limit: 5 }, + }); + + await setupConcurrency({ + redis, + keyProducer, + env: { id: "env-2", currentConcurrency: 0, limit: 5 }, + }); + + // Only add queues to env-1 + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 5000, + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 4000, + queueId: "queue-2", + orgId: "org-1", + envId: "env-1", + }); + + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + // Should only get one environment in the result + expect(result.length).toBe(1); + expect(result[0].envId).toBe("env-1"); + expect(result[0].queues.length).toBe(2); + } + ); + + redisTest( + "should respect maximumQueuePerEnvCount with priority offset queues", + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + seed: "test-seed-priority", + maximumQueuePerEnvCount: 2, + biases: { + concurrencyLimitBias: 0, + availableCapacityBias: 0, + queueAgeRandomization: 0.3, + }, + }); + + const now = Date.now(); + + // Setup queues with a mix of normal and priority offset ages + const queues = [ + { age: 5000, id: "queue-0" }, // Normal age + { age: 4000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET, id: "queue-1" }, // Priority + { age: 3000, id: "queue-2" }, // Normal age + { age: 2000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET, id: "queue-3" }, // Priority + { age: 1000, id: "queue-4" }, // Normal age + ]; + + await setupConcurrency({ + redis, + keyProducer, + env: { id: "env-1", currentConcurrency: 0, limit: 5 }, + }); + + for (const queue of queues) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - queue.age, + queueId: queue.id, + orgId: "org-1", + envId: "env-1", + }); + } + + // Run multiple iterations to check distribution + const iterations = 1000; + const queueSelectionCounts: Record = {}; + + for (let i = 0; i < iterations; i++) { + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i}` + ); + + const selectedQueues = result[0].queues; + for (const queueId of selectedQueues) { + const baseQueueId = queueId.split(":").pop()!; + queueSelectionCounts[baseQueueId] = (queueSelectionCounts[baseQueueId] || 0) + 1; + } + } + + console.log("\nPriority Queue Selection Statistics:"); + for (const [queueId, count] of Object.entries(queueSelectionCounts)) { + const percentage = (count / (iterations * 2)) * 100; + const isPriority = + queues.find((q) => q.id === queueId)?.age! > MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET; + console.log( + `${queueId}${isPriority ? " (priority)" : ""}: ${percentage.toFixed(2)}% (${count} times)` + ); + } + + // Verify all queues get selected + for (const queue of queues) { + expect(queueSelectionCounts[queue.id]).toBeGreaterThan(0); + } + + // Even with priority queues, we should still see a reasonable distribution + const selectionPercentages = Object.values(queueSelectionCounts).map( + (count) => (count / (iterations * 2)) * 100 + ); + const stdDev = calculateStandardDeviation(selectionPercentages); + expect(stdDev).toBeLessThan(20); // Allow for slightly more variance due to priority queues + } + ); }); // Helper function to flatten results for counting