@@ -14,7 +14,7 @@ import { env } from "~/env.server";
1414import { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
1515import { logger } from "~/services/logger.server" ;
1616import { getEntitlement } from "~/services/platform.v3.server" ;
17- import { workerQueue } from "~/services/worker .server" ;
17+ import { commonWorker } from "~/v3/commonWorker .server" ;
1818import { downloadPacketFromObjectStore , uploadPacketToObjectStore } from "../../v3/r2.server" ;
1919import { ServiceValidationError , WithRunEngine } from "../../v3/services/baseService.server" ;
2020import { OutOfEntitlementError , TriggerTaskService } from "../../v3/services/triggerTask.server" ;
@@ -244,88 +244,80 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
244244 }
245245 }
246246 } else {
247- return await $transaction ( this . _prisma , async ( tx ) => {
248- const batch = await tx . batchTaskRun . create ( {
249- data : {
250- id : BatchId . fromFriendlyId ( batchId ) ,
251- friendlyId : batchId ,
252- runtimeEnvironmentId : environment . id ,
253- runCount : body . items . length ,
254- runIds : [ ] ,
255- payload : payloadPacket . data ,
256- payloadType : payloadPacket . dataType ,
257- options,
258- batchVersion : "runengine:v1" ,
259- oneTimeUseToken : options . oneTimeUseToken ,
260- } ,
247+ const batch = await this . _prisma . batchTaskRun . create ( {
248+ data : {
249+ id : BatchId . fromFriendlyId ( batchId ) ,
250+ friendlyId : batchId ,
251+ runtimeEnvironmentId : environment . id ,
252+ runCount : body . items . length ,
253+ runIds : [ ] ,
254+ payload : payloadPacket . data ,
255+ payloadType : payloadPacket . dataType ,
256+ options,
257+ batchVersion : "runengine:v1" ,
258+ oneTimeUseToken : options . oneTimeUseToken ,
259+ } ,
260+ } ) ;
261+
262+ if ( body . parentRunId && body . resumeParentOnCompletion ) {
263+ await this . _engine . blockRunWithCreatedBatch ( {
264+ runId : RunId . fromFriendlyId ( body . parentRunId ) ,
265+ batchId : batch . id ,
266+ environmentId : environment . id ,
267+ projectId : environment . projectId ,
268+ organizationId : environment . organizationId ,
261269 } ) ;
270+ }
262271
263- if ( body . parentRunId && body . resumeParentOnCompletion ) {
264- await this . _engine . blockRunWithCreatedBatch ( {
265- runId : RunId . fromFriendlyId ( body . parentRunId ) ,
272+ switch ( this . _batchProcessingStrategy ) {
273+ case "sequential" : {
274+ await this . #enqueueBatchTaskRun ( {
266275 batchId : batch . id ,
267- environmentId : environment . id ,
268- projectId : environment . projectId ,
269- organizationId : environment . organizationId ,
270- tx,
276+ processingId : batchId ,
277+ range : { start : 0 , count : PROCESSING_BATCH_SIZE } ,
278+ attemptCount : 0 ,
279+ strategy : this . _batchProcessingStrategy ,
280+ parentRunId : body . parentRunId ,
281+ resumeParentOnCompletion : body . resumeParentOnCompletion ,
271282 } ) ;
272- }
273283
274- switch ( this . _batchProcessingStrategy ) {
275- case "sequential" : {
276- await this . #enqueueBatchTaskRun(
277- {
284+ break ;
285+ }
286+ case "parallel" : {
287+ const ranges = Array . from ( {
288+ length : Math . ceil ( body . items . length / PROCESSING_BATCH_SIZE ) ,
289+ } ) . map ( ( _ , index ) => ( {
290+ start : index * PROCESSING_BATCH_SIZE ,
291+ count : PROCESSING_BATCH_SIZE ,
292+ } ) ) ;
293+
294+ await Promise . all (
295+ ranges . map ( ( range , index ) =>
296+ this . #enqueueBatchTaskRun( {
278297 batchId : batch . id ,
279- processingId : batchId ,
280- range : { start : 0 , count : PROCESSING_BATCH_SIZE } ,
298+ processingId : ` ${ index } ` ,
299+ range,
281300 attemptCount : 0 ,
282301 strategy : this . _batchProcessingStrategy ,
283302 parentRunId : body . parentRunId ,
284303 resumeParentOnCompletion : body . resumeParentOnCompletion ,
285- } ,
286- tx
287- ) ;
288-
289- break ;
290- }
291- case "parallel" : {
292- const ranges = Array . from ( {
293- length : Math . ceil ( body . items . length / PROCESSING_BATCH_SIZE ) ,
294- } ) . map ( ( _ , index ) => ( {
295- start : index * PROCESSING_BATCH_SIZE ,
296- count : PROCESSING_BATCH_SIZE ,
297- } ) ) ;
298-
299- await Promise . all (
300- ranges . map ( ( range , index ) =>
301- this . #enqueueBatchTaskRun(
302- {
303- batchId : batch . id ,
304- processingId : `${ index } ` ,
305- range,
306- attemptCount : 0 ,
307- strategy : this . _batchProcessingStrategy ,
308- parentRunId : body . parentRunId ,
309- resumeParentOnCompletion : body . resumeParentOnCompletion ,
310- } ,
311- tx
312- )
313- )
314- ) ;
304+ } )
305+ )
306+ ) ;
315307
316- break ;
317- }
308+ break ;
318309 }
310+ }
319311
320- return batch ;
321- } ) ;
312+ return batch ;
322313 }
323314 }
324315
325316 async #enqueueBatchTaskRun( options : BatchProcessingOptions , tx ?: PrismaClientOrTransaction ) {
326- await workerQueue . enqueue ( "runengine.processBatchTaskRun" , options , {
327- tx,
328- jobKey : `RunEngineBatchTriggerService.process:${ options . batchId } :${ options . processingId } ` ,
317+ await commonWorker . enqueue ( {
318+ id : `RunEngineBatchTriggerService.process:${ options . batchId } :${ options . processingId } ` ,
319+ job : "runengine.processBatchTaskRun" ,
320+ payload : options ,
329321 } ) ;
330322 }
331323
0 commit comments