@@ -24,6 +24,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
2424import parseDuration from "parse-duration" ;
2525import { v3BulkActionPath } from "~/utils/pathBuilder" ;
2626import { formatDateTime } from "~/components/primitives/DateTime" ;
27+ import pMap from "p-map" ;
2728
2829export class BulkActionService extends BaseService {
2930 public async create (
@@ -191,29 +192,33 @@ export class BulkActionService extends BaseService {
191192 } ,
192193 } ) ;
193194
194- for ( const run of runs ) {
195- const [ error , result ] = await tryCatch (
196- cancelService . call ( run , {
197- reason : `Bulk action ${ group . friendlyId } cancelled run` ,
198- bulkActionId : bulkActionId ,
199- } )
200- ) ;
201- if ( error ) {
202- logger . error ( "Failed to cancel run" , {
203- error,
204- runId : run . id ,
205- status : run . status ,
206- } ) ;
195+ await pMap (
196+ runs ,
197+ async ( run ) => {
198+ const [ error , result ] = await tryCatch (
199+ cancelService . call ( run , {
200+ reason : `Bulk action ${ group . friendlyId } cancelled run` ,
201+ bulkActionId : bulkActionId ,
202+ } )
203+ ) ;
204+ if ( error ) {
205+ logger . error ( "Failed to cancel run" , {
206+ error,
207+ runId : run . id ,
208+ status : run . status ,
209+ } ) ;
207210
208- failureCount ++ ;
209- } else {
210- if ( ! result || result . alreadyFinished ) {
211211 failureCount ++ ;
212212 } else {
213- successCount ++ ;
213+ if ( ! result || result . alreadyFinished ) {
214+ failureCount ++ ;
215+ } else {
216+ successCount ++ ;
217+ }
214218 }
215- }
216- }
219+ } ,
220+ { concurrency : env . BULK_ACTION_SUBBATCH_CONCURRENCY }
221+ ) ;
217222
218223 break ;
219224 }
@@ -228,33 +233,37 @@ export class BulkActionService extends BaseService {
228233 } ,
229234 } ) ;
230235
231- for ( const run of runs ) {
232- const [ error , result ] = await tryCatch (
233- replayService . call ( run , {
234- bulkActionId : bulkActionId ,
235- } )
236- ) ;
237- if ( error ) {
238- logger . error ( "Failed to replay run, error" , {
239- error,
240- runId : run . id ,
241- status : run . status ,
242- } ) ;
243-
244- failureCount ++ ;
245- } else {
246- if ( ! result ) {
247- logger . error ( "Failed to replay run, no result" , {
236+ await pMap (
237+ runs ,
238+ async ( run ) => {
239+ const [ error , result ] = await tryCatch (
240+ replayService . call ( run , {
241+ bulkActionId : bulkActionId ,
242+ } )
243+ ) ;
244+ if ( error ) {
245+ logger . error ( "Failed to replay run, error" , {
246+ error,
248247 runId : run . id ,
249248 status : run . status ,
250249 } ) ;
251250
252251 failureCount ++ ;
253252 } else {
254- successCount ++ ;
253+ if ( ! result ) {
254+ logger . error ( "Failed to replay run, no result" , {
255+ runId : run . id ,
256+ status : run . status ,
257+ } ) ;
258+
259+ failureCount ++ ;
260+ } else {
261+ successCount ++ ;
262+ }
255263 }
256- }
257- }
264+ } ,
265+ { concurrency : env . BULK_ACTION_SUBBATCH_CONCURRENCY }
266+ ) ;
258267 break ;
259268 }
260269 }
0 commit comments