@@ -14,6 +14,8 @@ export default class Queue {
1414 private ctx : Context ;
1515 private snapshotNames : Array < string > = [ ] ;
1616 private variants : Array < string > = [ ] ;
17+ private activeProcessingCount : number = 0 ;
18+ private readonly MAX_CONCURRENT_PROCESSING = 5 ;
1719
1820 constructor ( ctx : Context ) {
1921 this . ctx = ctx ;
@@ -275,15 +277,59 @@ export default class Queue {
275277
276278 private async processNext ( ) : Promise < void > {
277279 if ( ! this . isEmpty ( ) ) {
280+ const useRemoteDiscovery = this . ctx . env . USE_REMOTE_DISCOVERY || this . ctx . config . useRemoteDiscovery ;
281+
282+ if ( useRemoteDiscovery && ! this . ctx . config . delayedUpload && ! this . ctx . config . allowDuplicateSnapshotNames ) {
283+ const maxConcurrentProcessing = this . ctx . env . MAX_CONCURRENT_PROCESSING === 0 ? this . MAX_CONCURRENT_PROCESSING : this . ctx . env . MAX_CONCURRENT_PROCESSING ;
284+ const snapshotsToProcess : Array < Snapshot > = [ ] ;
285+ const maxSnapshots = Math . min ( maxConcurrentProcessing - this . activeProcessingCount , this . snapshots . length ) ;
286+
287+ for ( let i = 0 ; i < maxSnapshots ; i ++ ) {
288+ let snapshot ;
289+ if ( this . ctx . config . delayedUpload ) {
290+ snapshot = this . snapshots . pop ( ) ;
291+ } else {
292+ snapshot = this . snapshots . shift ( ) ;
293+ }
294+ if ( snapshot ) {
295+ snapshotsToProcess . push ( snapshot ) ;
296+ }
297+ }
298+
299+ if ( snapshotsToProcess . length > 0 ) {
300+ this . activeProcessingCount += snapshotsToProcess . length ;
301+ const processingPromises = snapshotsToProcess . map ( snapshot => this . processSnapshot ( snapshot ) ) ;
302+ await Promise . allSettled ( processingPromises ) ;
303+ this . activeProcessingCount -= snapshotsToProcess . length ;
304+
305+ if ( ! this . isEmpty ( ) ) {
306+ this . processNext ( ) ;
307+ } else {
308+ this . processing = false ;
309+ }
310+ return ;
311+ }
312+ }
313+
278314 let snapshot ;
279315 if ( this . ctx . config . delayedUpload ) {
280316 snapshot = this . snapshots . pop ( ) ;
281317 } else {
282318 snapshot = this . snapshots . shift ( ) ;
283319 }
284- try {
285- this . processingSnapshot = snapshot ?. name ;
286- let drop = false ;
320+ if ( snapshot ) {
321+ await this . processSnapshot ( snapshot ) ;
322+ this . processNext ( ) ;
323+ }
324+ } else {
325+ this . processing = false ;
326+ }
327+ }
328+
329+ private async processSnapshot ( snapshot : Snapshot ) : Promise < void > {
330+ try {
331+ this . processingSnapshot = snapshot ?. name ;
332+ let drop = false ;
287333
288334
289335 if ( this . ctx . isStartExec ) {
@@ -450,7 +496,6 @@ export default class Queue {
450496 if ( snapshot ?. options ?. contextId ) {
451497 this . ctx . contextToSnapshotMap ?. set ( snapshot ?. options ?. contextId , '2' ) ;
452498 }
453- this . processNext ( ) ;
454499 } else {
455500 let approvalThreshold = snapshot ?. options ?. approvalThreshold || this . ctx . config . approvalThreshold ;
456501 let rejectionThreshold = snapshot ?. options ?. rejectionThreshold || this . ctx . config . rejectionThreshold ;
@@ -487,10 +532,6 @@ export default class Queue {
487532 this . ctx . log . debug ( `Closed browser context for snapshot ${ snapshot . name } ` ) ;
488533 }
489534 }
490- this . processNext ( ) ;
491- } else {
492- this . processing = false ;
493- }
494535 }
495536
496537 isProcessing ( ) : boolean {
0 commit comments