diff --git a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts index e46177ec0d..26ef1d8093 100644 --- a/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts +++ b/internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts @@ -448,7 +448,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy { // Group queues by env const queuesByEnv = queues.reduce( (acc, queue) => { - if (!acc[`${queue.org}:${queue.project}:${queue.env}`]) { + if (!acc[queue.env]) { acc[queue.env] = []; } acc[queue.env].push(queue); diff --git a/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts b/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts index 4f8f9178fe..853f4ac3e0 100644 --- a/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts @@ -1021,7 +1021,141 @@ describe("FairDequeuingStrategy", () => { expect(selectionPercentages["env-2"]).toBeGreaterThan(40); // Verify that env-4 (lowest average age) gets selected in less than 20% of iterations - expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); + expect(selectionPercentages["env-4"] || 0).toBeLessThan(20); + } + ); + + redisTest( + "#selectTopEnvs groups queues by environment", + async ({ redisOptions: redis }) => { + const keyProducer = new RunQueueFullKeyProducer(); + const strategy = new FairQueueSelectionStrategy({ + redis, + keys: keyProducer, + defaultEnvConcurrencyLimit: 5, + parentQueueLimit: 100, + seed: "group-test", + maximumEnvCount: 2, + }); + + const now = Date.now(); + + // env-1 with two queues from different orgs/projects + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: "queue-1-old", + orgId: "org-a", + projectId: "proj-a", + envId: "env-1", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 10, + queueId: "queue-1-new", + orgId: "org-b", + projectId: "proj-b", + envId: "env-1", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 400, + queueId: "queue-2", + orgId: "org-2", + projectId: "proj-2", + envId: "env-2", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 300, + queueId: "queue-3", + orgId: "org-3", + projectId: "proj-3", + envId: "env-3", + }); + + // Setup concurrency limits + await setupConcurrency({ + redis, + keyProducer, + env: { + envId: "env-1", + projectId: "proj-a", + orgId: "org-a", + currentConcurrency: 0, + limit: 5, + }, + }); + + await setupConcurrency({ + redis, + keyProducer, + env: { + envId: "env-1", + projectId: "proj-b", + orgId: "org-b", + currentConcurrency: 0, + limit: 5, + }, + }); + + await setupConcurrency({ + redis, + keyProducer, + env: { + envId: "env-2", + projectId: "proj-2", + orgId: "org-2", + currentConcurrency: 0, + limit: 5, + }, + }); + + await setupConcurrency({ + redis, + keyProducer, + env: { + envId: "env-3", + projectId: "proj-3", + orgId: "org-3", + currentConcurrency: 0, + limit: 5, + }, + }); + + const envResult = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + const result = flattenResults(envResult); + + const queuesByEnv = result.reduce( + (acc, queueId) => { + const envId = keyProducer.envIdFromQueue(queueId); + if (!acc[envId]) { + acc[envId] = []; + } + acc[envId].push(queueId); + return acc; + }, + {} as Record + ); + + expect(Object.keys(queuesByEnv).length).toBe(2); + expect(queuesByEnv["env-1"]).toBeDefined(); + expect(queuesByEnv["env-1"].length).toBe(2); } ); });