@@ -1235,6 +1235,228 @@ describe("FairDequeuingStrategy", () => {
12351235 expect ( uniquePairings ) . toBeGreaterThan ( 5 ) ; // Should see at least half of possible combinations
12361236 }
12371237 ) ;
1238+
1239+ redisTest (
1240+ "should handle maximumQueuePerEnvCount larger than available queues" ,
1241+ async ( { redisOptions } ) => {
1242+ const redis = createRedisClient ( redisOptions ) ;
1243+
1244+ const keyProducer = createKeyProducer ( "test" ) ;
1245+ const strategy = new FairDequeuingStrategy ( {
1246+ tracer,
1247+ redis,
1248+ keys : keyProducer ,
1249+ defaultEnvConcurrency : 5 ,
1250+ parentQueueLimit : 100 ,
1251+ seed : "test-seed-max-larger" ,
1252+ maximumQueuePerEnvCount : 5 , // Larger than the number of queues we'll create
1253+ } ) ;
1254+
1255+ const now = Date . now ( ) ;
1256+
1257+ // Setup two environments with different numbers of queues
1258+ const envSetups = [
1259+ {
1260+ envId : "env-1" ,
1261+ queues : [ { age : 5000 } , { age : 4000 } ] ,
1262+ } ,
1263+ {
1264+ envId : "env-2" ,
1265+ queues : [ { age : 3000 } ] ,
1266+ } ,
1267+ ] ;
1268+
1269+ // Setup queues and concurrency for each env
1270+ for ( const setup of envSetups ) {
1271+ await setupConcurrency ( {
1272+ redis,
1273+ keyProducer,
1274+ env : { id : setup . envId , currentConcurrency : 0 , limit : 5 } ,
1275+ } ) ;
1276+
1277+ for ( let i = 0 ; i < setup . queues . length ; i ++ ) {
1278+ await setupQueue ( {
1279+ redis,
1280+ keyProducer,
1281+ parentQueue : "parent-queue" ,
1282+ score : now - setup . queues [ i ] . age ,
1283+ queueId : `queue-${ setup . envId } -${ i } ` ,
1284+ orgId : `org-${ setup . envId } ` ,
1285+ envId : setup . envId ,
1286+ } ) ;
1287+ }
1288+ }
1289+
1290+ const result = await strategy . distributeFairQueuesFromParentQueue (
1291+ "parent-queue" ,
1292+ "consumer-1"
1293+ ) ;
1294+
1295+ // Should get all queues from both environments
1296+ const env1Queues = result . find ( ( eq ) => eq . envId === "env-1" ) ?. queues ?? [ ] ;
1297+ const env2Queues = result . find ( ( eq ) => eq . envId === "env-2" ) ?. queues ?? [ ] ;
1298+
1299+ // env-1 should have both its queues
1300+ expect ( env1Queues . length ) . toBe ( 2 ) ;
1301+ // env-2 should have its single queue
1302+ expect ( env2Queues . length ) . toBe ( 1 ) ;
1303+ }
1304+ ) ;
1305+
1306+ redisTest (
1307+ "should handle empty environments with maximumQueuePerEnvCount" ,
1308+ async ( { redisOptions } ) => {
1309+ const redis = createRedisClient ( redisOptions ) ;
1310+
1311+ const keyProducer = createKeyProducer ( "test" ) ;
1312+ const strategy = new FairDequeuingStrategy ( {
1313+ tracer,
1314+ redis,
1315+ keys : keyProducer ,
1316+ defaultEnvConcurrency : 5 ,
1317+ parentQueueLimit : 100 ,
1318+ seed : "test-seed-empty-env" ,
1319+ maximumQueuePerEnvCount : 2 ,
1320+ } ) ;
1321+
1322+ const now = Date . now ( ) ;
1323+
1324+ // Setup two environments, one with queues, one without
1325+ await setupConcurrency ( {
1326+ redis,
1327+ keyProducer,
1328+ env : { id : "env-1" , currentConcurrency : 0 , limit : 5 } ,
1329+ } ) ;
1330+
1331+ await setupConcurrency ( {
1332+ redis,
1333+ keyProducer,
1334+ env : { id : "env-2" , currentConcurrency : 0 , limit : 5 } ,
1335+ } ) ;
1336+
1337+ // Only add queues to env-1
1338+ await setupQueue ( {
1339+ redis,
1340+ keyProducer,
1341+ parentQueue : "parent-queue" ,
1342+ score : now - 5000 ,
1343+ queueId : "queue-1" ,
1344+ orgId : "org-1" ,
1345+ envId : "env-1" ,
1346+ } ) ;
1347+
1348+ await setupQueue ( {
1349+ redis,
1350+ keyProducer,
1351+ parentQueue : "parent-queue" ,
1352+ score : now - 4000 ,
1353+ queueId : "queue-2" ,
1354+ orgId : "org-1" ,
1355+ envId : "env-1" ,
1356+ } ) ;
1357+
1358+ const result = await strategy . distributeFairQueuesFromParentQueue (
1359+ "parent-queue" ,
1360+ "consumer-1"
1361+ ) ;
1362+
1363+ // Should only get one environment in the result
1364+ expect ( result . length ) . toBe ( 1 ) ;
1365+ expect ( result [ 0 ] . envId ) . toBe ( "env-1" ) ;
1366+ expect ( result [ 0 ] . queues . length ) . toBe ( 2 ) ;
1367+ }
1368+ ) ;
1369+
1370+ redisTest (
1371+ "should respect maximumQueuePerEnvCount with priority offset queues" ,
1372+ async ( { redisOptions } ) => {
1373+ const redis = createRedisClient ( redisOptions ) ;
1374+
1375+ const keyProducer = createKeyProducer ( "test" ) ;
1376+ const strategy = new FairDequeuingStrategy ( {
1377+ tracer,
1378+ redis,
1379+ keys : keyProducer ,
1380+ defaultEnvConcurrency : 5 ,
1381+ parentQueueLimit : 100 ,
1382+ seed : "test-seed-priority" ,
1383+ maximumQueuePerEnvCount : 2 ,
1384+ biases : {
1385+ concurrencyLimitBias : 0 ,
1386+ availableCapacityBias : 0 ,
1387+ queueAgeRandomization : 0.3 ,
1388+ } ,
1389+ } ) ;
1390+
1391+ const now = Date . now ( ) ;
1392+
1393+ // Setup queues with a mix of normal and priority offset ages
1394+ const queues = [
1395+ { age : 5000 , id : "queue-0" } , // Normal age
1396+ { age : 4000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET , id : "queue-1" } , // Priority
1397+ { age : 3000 , id : "queue-2" } , // Normal age
1398+ { age : 2000 + MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET , id : "queue-3" } , // Priority
1399+ { age : 1000 , id : "queue-4" } , // Normal age
1400+ ] ;
1401+
1402+ await setupConcurrency ( {
1403+ redis,
1404+ keyProducer,
1405+ env : { id : "env-1" , currentConcurrency : 0 , limit : 5 } ,
1406+ } ) ;
1407+
1408+ for ( const queue of queues ) {
1409+ await setupQueue ( {
1410+ redis,
1411+ keyProducer,
1412+ parentQueue : "parent-queue" ,
1413+ score : now - queue . age ,
1414+ queueId : queue . id ,
1415+ orgId : "org-1" ,
1416+ envId : "env-1" ,
1417+ } ) ;
1418+ }
1419+
1420+ // Run multiple iterations to check distribution
1421+ const iterations = 1000 ;
1422+ const queueSelectionCounts : Record < string , number > = { } ;
1423+
1424+ for ( let i = 0 ; i < iterations ; i ++ ) {
1425+ const result = await strategy . distributeFairQueuesFromParentQueue (
1426+ "parent-queue" ,
1427+ `consumer-${ i } `
1428+ ) ;
1429+
1430+ const selectedQueues = result [ 0 ] . queues ;
1431+ for ( const queueId of selectedQueues ) {
1432+ const baseQueueId = queueId . split ( ":" ) . pop ( ) ! ;
1433+ queueSelectionCounts [ baseQueueId ] = ( queueSelectionCounts [ baseQueueId ] || 0 ) + 1 ;
1434+ }
1435+ }
1436+
1437+ console . log ( "\nPriority Queue Selection Statistics:" ) ;
1438+ for ( const [ queueId , count ] of Object . entries ( queueSelectionCounts ) ) {
1439+ const percentage = ( count / ( iterations * 2 ) ) * 100 ;
1440+ const isPriority =
1441+ queues . find ( ( q ) => q . id === queueId ) ?. age ! > MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET ;
1442+ console . log (
1443+ `${ queueId } ${ isPriority ? " (priority)" : "" } : ${ percentage . toFixed ( 2 ) } % (${ count } times)`
1444+ ) ;
1445+ }
1446+
1447+ // Verify all queues get selected
1448+ for ( const queue of queues ) {
1449+ expect ( queueSelectionCounts [ queue . id ] ) . toBeGreaterThan ( 0 ) ;
1450+ }
1451+
1452+ // Even with priority queues, we should still see a reasonable distribution
1453+ const selectionPercentages = Object . values ( queueSelectionCounts ) . map (
1454+ ( count ) => ( count / ( iterations * 2 ) ) * 100
1455+ ) ;
1456+ const stdDev = calculateStandardDeviation ( selectionPercentages ) ;
1457+ expect ( stdDev ) . toBeLessThan ( 20 ) ; // Allow for slightly more variance due to priority queues
1458+ }
1459+ ) ;
12381460} ) ;
12391461
12401462// Helper function to flatten results for counting
0 commit comments