@@ -1119,6 +1119,122 @@ describe("FairDequeuingStrategy", () => {
11191119 expect ( env2Queues . length ) . toBe ( 2 ) ;
11201120 }
11211121 ) ;
1122+
1123+ redisTest (
1124+ "should fairly distribute queues when using maximumQueuePerEnvCount over time" ,
1125+ async ( { redisOptions } ) => {
1126+ const redis = createRedisClient ( redisOptions ) ;
1127+
1128+ const keyProducer = createKeyProducer ( "test" ) ;
1129+ const strategy = new FairDequeuingStrategy ( {
1130+ tracer,
1131+ redis,
1132+ keys : keyProducer ,
1133+ defaultEnvConcurrency : 5 ,
1134+ parentQueueLimit : 100 ,
1135+ seed : "test-seed-fair-distribution" ,
1136+ maximumQueuePerEnvCount : 2 , // Only take 2 queues at a time
1137+ biases : {
1138+ concurrencyLimitBias : 0 ,
1139+ availableCapacityBias : 0 ,
1140+ queueAgeRandomization : 0.3 , // Add some randomization to allow newer queues a chance
1141+ } ,
1142+ } ) ;
1143+
1144+ const now = Date . now ( ) ;
1145+
1146+ // Setup one environment with 5 queues of different ages
1147+ const queues = [
1148+ { age : 5000 , id : "queue-0" } , // Oldest
1149+ { age : 4000 , id : "queue-1" } ,
1150+ { age : 3000 , id : "queue-2" } ,
1151+ { age : 2000 , id : "queue-3" } ,
1152+ { age : 1000 , id : "queue-4" } , // Newest
1153+ ] ;
1154+
1155+ // Setup the environment and its queues
1156+ await setupConcurrency ( {
1157+ redis,
1158+ keyProducer,
1159+ env : { id : "env-1" , currentConcurrency : 0 , limit : 5 } ,
1160+ } ) ;
1161+
1162+ for ( const queue of queues ) {
1163+ await setupQueue ( {
1164+ redis,
1165+ keyProducer,
1166+ parentQueue : "parent-queue" ,
1167+ score : now - queue . age ,
1168+ queueId : queue . id ,
1169+ orgId : "org-1" ,
1170+ envId : "env-1" ,
1171+ } ) ;
1172+ }
1173+
1174+ // Run multiple iterations and track which queues are selected
1175+ const iterations = 1000 ;
1176+ const queueSelectionCounts : Record < string , number > = { } ;
1177+ const queuePairings : Record < string , number > = { } ;
1178+
1179+ for ( let i = 0 ; i < iterations ; i ++ ) {
1180+ const result = await strategy . distributeFairQueuesFromParentQueue (
1181+ "parent-queue" ,
1182+ `consumer-${ i } `
1183+ ) ;
1184+
1185+ // There should be exactly one environment
1186+ expect ( result . length ) . toBe ( 1 ) ;
1187+ const selectedQueues = result [ 0 ] . queues ;
1188+
1189+ // Should always get exactly 2 queues due to maximumQueuePerEnvCount
1190+ expect ( selectedQueues . length ) . toBe ( 2 ) ;
1191+
1192+ // Track individual queue selections
1193+ for ( const queueId of selectedQueues ) {
1194+ const baseQueueId = queueId . split ( ":" ) . pop ( ) ! ;
1195+ queueSelectionCounts [ baseQueueId ] = ( queueSelectionCounts [ baseQueueId ] || 0 ) + 1 ;
1196+ }
1197+
1198+ // Track queue pairings to ensure variety
1199+ const [ first , second ] = selectedQueues . map ( ( qId ) => qId . split ( ":" ) . pop ( ) ! ) . sort ( ) ;
1200+ const pairingKey = `${ first } -${ second } ` ;
1201+ queuePairings [ pairingKey ] = ( queuePairings [ pairingKey ] || 0 ) + 1 ;
1202+ }
1203+
1204+ console . log ( "\nQueue Selection Statistics:" ) ;
1205+ for ( const [ queueId , count ] of Object . entries ( queueSelectionCounts ) ) {
1206+ const percentage = ( count / ( iterations * 2 ) ) * 100 ; // Times 2 because we select 2 queues each time
1207+ console . log ( `${ queueId } : ${ percentage . toFixed ( 2 ) } % (${ count } times)` ) ;
1208+ }
1209+
1210+ console . log ( "\nQueue Pairing Statistics:" ) ;
1211+ for ( const [ pair , count ] of Object . entries ( queuePairings ) ) {
1212+ const percentage = ( count / iterations ) * 100 ;
1213+ console . log ( `${ pair } : ${ percentage . toFixed ( 2 ) } % (${ count } times)` ) ;
1214+ }
1215+
1216+ // Verify that all queues were selected at least once
1217+ for ( const queue of queues ) {
1218+ expect ( queueSelectionCounts [ queue . id ] ) . toBeGreaterThan ( 0 ) ;
1219+ }
1220+
1221+ // Calculate standard deviation of selection percentages
1222+ const selectionPercentages = Object . values ( queueSelectionCounts ) . map (
1223+ ( count ) => ( count / ( iterations * 2 ) ) * 100
1224+ ) ;
1225+ const stdDev = calculateStandardDeviation ( selectionPercentages ) ;
1226+
1227+ // The standard deviation should be reasonable given our age bias
1228+ // Higher stdDev means more bias towards older queues
1229+ // We expect some bias due to queueAgeRandomization being 0.3
1230+ expect ( stdDev ) . toBeLessThan ( 15 ) ; // Allow for age-based bias but not extreme
1231+
1232+ // Verify we get different pairings of queues
1233+ const uniquePairings = Object . keys ( queuePairings ) . length ;
1234+ // With 5 queues, we can have 10 possible unique pairs
1235+ expect ( uniquePairings ) . toBeGreaterThan ( 5 ) ; // Should see at least half of possible combinations
1236+ }
1237+ ) ;
11221238} ) ;
11231239
11241240// Helper function to flatten results for counting
0 commit comments