@@ -8,6 +8,20 @@ import seedrandom from "seedrandom";
88import { Tracer } from "@opentelemetry/api" ;
99import { startSpan } from "../tracing.server" ;
1010
11+ export type FairDequeuingStrategyBiases = {
12+ /**
13+ * How much to bias towards environments with higher concurrency limits
14+ * 0 = no bias, 1 = full bias based on limit differences
15+ */
16+ concurrencyLimitBias : number ;
17+
18+ /**
19+ * How much to bias towards environments with more available capacity
20+ * 0 = no bias, 1 = full bias based on available capacity
21+ */
22+ availableCapacityBias : number ;
23+ } ;
24+
1125export type FairDequeuingStrategyOptions = {
1226 redis : Redis ;
1327 keys : MarQSKeyProducer ;
@@ -17,6 +31,11 @@ export type FairDequeuingStrategyOptions = {
1731 checkForDisabledOrgs : boolean ;
1832 tracer : Tracer ;
1933 seed ?: string ;
34+ /**
35+ * Configure biasing for environment shuffling
36+ * If not provided, no biasing will be applied (completely random shuffling)
37+ */
38+ biases ?: FairDequeuingStrategyBiases ;
2039} ;
2140
2241type FairQueueConcurrency = {
@@ -33,13 +52,23 @@ type FairQueueSnapshot = {
3352 queues : Array < FairQueue > ;
3453} ;
3554
55+ type WeightedEnv = {
56+ envId : string ;
57+ weight : number ;
58+ } ;
59+
3660const emptyFairQueueSnapshot : FairQueueSnapshot = {
3761 id : "empty" ,
3862 orgs : { } ,
3963 envs : { } ,
4064 queues : [ ] ,
4165} ;
4266
67+ const defaultBiases : FairDequeuingStrategyBiases = {
68+ concurrencyLimitBias : 0 ,
69+ availableCapacityBias : 0 ,
70+ } ;
71+
4372export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
4473 private _cache : UnkeyCache < {
4574 concurrencyLimit : number ;
@@ -107,30 +136,85 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
107136 ) ;
108137 }
109138
110- // Now we need to:
111- // 1. Shuffle the environments
112- // 2. Sort the queues by their environment order in the shuffled list
113- // 3. Keep the queues sorted by their age inside their "environment" slice of the final array
114139 #shuffleQueuesByEnv( snapshot : FairQueueSnapshot ) : Array < string > {
115140 const envs = Object . keys ( snapshot . envs ) ;
141+ const biases = this . options . biases ?? defaultBiases ;
142+
143+ if ( biases . concurrencyLimitBias === 0 && biases . availableCapacityBias === 0 ) {
144+ const shuffledEnvs = this . #shuffle( envs ) ;
145+ return this . #orderQueuesByEnvs( shuffledEnvs , snapshot ) ;
146+ }
116147
117- const shuffledEnvs = this . #shuffle( envs ) ;
148+ // Find the maximum concurrency limit for normalization
149+ const maxLimit = Math . max ( ...envs . map ( ( envId ) => snapshot . envs [ envId ] . concurrency . limit ) ) ;
118150
151+ // Calculate weights for each environment
152+ const weightedEnvs : WeightedEnv [ ] = envs . map ( ( envId ) => {
153+ const env = snapshot . envs [ envId ] ;
154+
155+ // Start with base weight of 1
156+ let weight = 1 ;
157+
158+ // Add normalized concurrency limit bias if configured
159+ if ( biases . concurrencyLimitBias > 0 ) {
160+ const normalizedLimit = env . concurrency . limit / maxLimit ;
161+ // Square or cube the bias to make it more pronounced at higher values
162+ weight *= 1 + Math . pow ( normalizedLimit * biases . concurrencyLimitBias , 2 ) ;
163+ }
164+
165+ // Add available capacity bias if configured
166+ if ( biases . availableCapacityBias > 0 ) {
167+ const usedCapacityPercentage = env . concurrency . current / env . concurrency . limit ;
168+ const availableCapacityBonus = 1 - usedCapacityPercentage ;
169+ // Square or cube the bias to make it more pronounced at higher values
170+ weight *= 1 + Math . pow ( availableCapacityBonus * biases . availableCapacityBias , 2 ) ;
171+ }
172+
173+ return { envId, weight } ;
174+ } ) ;
175+
176+ const shuffledEnvs = this . #weightedShuffle( weightedEnvs ) ;
177+ return this . #orderQueuesByEnvs( shuffledEnvs , snapshot ) ;
178+ }
179+
180+ #weightedShuffle( weightedItems : WeightedEnv [ ] ) : string [ ] {
181+ const totalWeight = weightedItems . reduce ( ( sum , item ) => sum + item . weight , 0 ) ;
182+ const result : string [ ] = [ ] ;
183+ const items = [ ...weightedItems ] ;
184+
185+ while ( items . length > 0 ) {
186+ let random = this . _rng ( ) * totalWeight ;
187+ let index = 0 ;
188+
189+ // Find item based on weighted random selection
190+ while ( random > 0 && index < items . length ) {
191+ random -= items [ index ] . weight ;
192+ index ++ ;
193+ }
194+ index = Math . max ( 0 , index - 1 ) ;
195+
196+ // Add selected item to result and remove from items
197+ result . push ( items [ index ] . envId ) ;
198+ items . splice ( index , 1 ) ;
199+ }
200+
201+ return result ;
202+ }
203+
204+ // Helper method to maintain DRY principle
205+ #orderQueuesByEnvs( envs : string [ ] , snapshot : FairQueueSnapshot ) : Array < string > {
119206 const queuesByEnv = snapshot . queues . reduce ( ( acc , queue ) => {
120207 if ( ! acc [ queue . env ] ) {
121208 acc [ queue . env ] = [ ] ;
122209 }
123-
124210 acc [ queue . env ] . push ( queue ) ;
125-
126211 return acc ;
127212 } , { } as Record < string , Array < FairQueue > > ) ;
128213
129- const queues = shuffledEnvs . reduce ( ( acc , envId ) => {
214+ const queues = envs . reduce ( ( acc , envId ) => {
130215 if ( queuesByEnv [ envId ] ) {
131216 acc . push ( ...queuesByEnv [ envId ] . sort ( ( a , b ) => b . age - a . age ) ) ;
132217 }
133-
134218 return acc ;
135219 } , [ ] as Array < FairQueue > ) ;
136220
0 commit comments