@@ -197,7 +197,8 @@ export class RunEngineTriggerTaskService {
197
197
}
198
198
199
199
const idempotencyKeyConcernResult = await this . idempotencyKeyConcern . handleTriggerRequest (
200
- triggerRequest
200
+ triggerRequest ,
201
+ parentRun ?. taskEventStore
201
202
) ;
202
203
203
204
if ( idempotencyKeyConcernResult . isCached ) {
@@ -266,105 +267,109 @@ export class RunEngineTriggerTaskService {
266
267
const workerQueue = await this . queueConcern . getWorkerQueue ( environment , body . options ?. region ) ;
267
268
268
269
try {
269
- return await this . traceEventConcern . traceRun ( triggerRequest , async ( event , store ) => {
270
- const result = await this . runNumberIncrementer . incrementRunNumber (
271
- triggerRequest ,
272
- async ( num ) => {
273
- event . setAttribute ( "queueName" , queueName ) ;
274
- span . setAttribute ( "queueName" , queueName ) ;
275
- event . setAttribute ( "runId" , runFriendlyId ) ;
276
- span . setAttribute ( "runId" , runFriendlyId ) ;
277
-
278
- const payloadPacket = await this . payloadProcessor . process ( triggerRequest ) ;
279
-
280
- const taskRun = await this . engine . trigger (
281
- {
282
- number : num ,
283
- friendlyId : runFriendlyId ,
284
- environment : environment ,
285
- idempotencyKey,
286
- idempotencyKeyExpiresAt : idempotencyKey ? idempotencyKeyExpiresAt : undefined ,
287
- taskIdentifier : taskId ,
288
- payload : payloadPacket . data ?? "" ,
289
- payloadType : payloadPacket . dataType ,
290
- context : body . context ,
291
- traceContext : this . #propagateExternalTraceContext(
292
- event . traceContext ,
293
- parentRun ?. traceContext ,
294
- event . traceparent ?. spanId
295
- ) ,
296
- traceId : event . traceId ,
297
- spanId : event . spanId ,
298
- parentSpanId :
299
- options . parentAsLinkType === "replay" ? undefined : event . traceparent ?. spanId ,
300
- replayedFromTaskRunFriendlyId : options . replayedFromTaskRunFriendlyId ,
301
- lockedToVersionId : lockedToBackgroundWorker ?. id ,
302
- taskVersion : lockedToBackgroundWorker ?. version ,
303
- sdkVersion : lockedToBackgroundWorker ?. sdkVersion ,
304
- cliVersion : lockedToBackgroundWorker ?. cliVersion ,
305
- concurrencyKey : body . options ?. concurrencyKey ,
306
- queue : queueName ,
307
- lockedQueueId,
308
- workerQueue,
309
- isTest : body . options ?. test ?? false ,
310
- delayUntil,
311
- queuedAt : delayUntil ? undefined : new Date ( ) ,
312
- maxAttempts : body . options ?. maxAttempts ,
313
- taskEventStore : store ,
314
- ttl,
315
- tags,
316
- oneTimeUseToken : options . oneTimeUseToken ,
317
- parentTaskRunId : parentRun ?. id ,
318
- rootTaskRunId : parentRun ?. rootTaskRunId ?? parentRun ?. id ,
319
- batch : options ?. batchId
320
- ? {
321
- id : options . batchId ,
322
- index : options . batchIndex ?? 0 ,
323
- }
324
- : undefined ,
325
- resumeParentOnCompletion : body . options ?. resumeParentOnCompletion ,
326
- depth,
327
- metadata : metadataPacket ?. data ,
328
- metadataType : metadataPacket ?. dataType ,
329
- seedMetadata : metadataPacket ?. data ,
330
- seedMetadataType : metadataPacket ?. dataType ,
331
- maxDurationInSeconds : body . options ?. maxDuration
332
- ? clampMaxDuration ( body . options . maxDuration )
333
- : undefined ,
334
- machine : body . options ?. machine ,
335
- priorityMs : body . options ?. priority ? body . options . priority * 1_000 : undefined ,
336
- queueTimestamp :
337
- options . queueTimestamp ??
338
- ( parentRun && body . options ?. resumeParentOnCompletion
339
- ? parentRun . queueTimestamp ?? undefined
340
- : undefined ) ,
341
- scheduleId : options . scheduleId ,
342
- scheduleInstanceId : options . scheduleInstanceId ,
343
- createdAt : options . overrideCreatedAt ,
344
- bulkActionId : body . options ?. bulkActionId ,
345
- planType,
346
- } ,
347
- this . prisma
348
- ) ;
349
-
350
- const error = taskRun . error ? TaskRunError . parse ( taskRun . error ) : undefined ;
351
-
352
- if ( error ) {
353
- event . failWithError ( error ) ;
270
+ return await this . traceEventConcern . traceRun (
271
+ triggerRequest ,
272
+ parentRun ?. taskEventStore ,
273
+ async ( event , store ) => {
274
+ const result = await this . runNumberIncrementer . incrementRunNumber (
275
+ triggerRequest ,
276
+ async ( num ) => {
277
+ event . setAttribute ( "queueName" , queueName ) ;
278
+ span . setAttribute ( "queueName" , queueName ) ;
279
+ event . setAttribute ( "runId" , runFriendlyId ) ;
280
+ span . setAttribute ( "runId" , runFriendlyId ) ;
281
+
282
+ const payloadPacket = await this . payloadProcessor . process ( triggerRequest ) ;
283
+
284
+ const taskRun = await this . engine . trigger (
285
+ {
286
+ number : num ,
287
+ friendlyId : runFriendlyId ,
288
+ environment : environment ,
289
+ idempotencyKey,
290
+ idempotencyKeyExpiresAt : idempotencyKey ? idempotencyKeyExpiresAt : undefined ,
291
+ taskIdentifier : taskId ,
292
+ payload : payloadPacket . data ?? "" ,
293
+ payloadType : payloadPacket . dataType ,
294
+ context : body . context ,
295
+ traceContext : this . #propagateExternalTraceContext(
296
+ event . traceContext ,
297
+ parentRun ?. traceContext ,
298
+ event . traceparent ?. spanId
299
+ ) ,
300
+ traceId : event . traceId ,
301
+ spanId : event . spanId ,
302
+ parentSpanId :
303
+ options . parentAsLinkType === "replay" ? undefined : event . traceparent ?. spanId ,
304
+ replayedFromTaskRunFriendlyId : options . replayedFromTaskRunFriendlyId ,
305
+ lockedToVersionId : lockedToBackgroundWorker ?. id ,
306
+ taskVersion : lockedToBackgroundWorker ?. version ,
307
+ sdkVersion : lockedToBackgroundWorker ?. sdkVersion ,
308
+ cliVersion : lockedToBackgroundWorker ?. cliVersion ,
309
+ concurrencyKey : body . options ?. concurrencyKey ,
310
+ queue : queueName ,
311
+ lockedQueueId,
312
+ workerQueue,
313
+ isTest : body . options ?. test ?? false ,
314
+ delayUntil,
315
+ queuedAt : delayUntil ? undefined : new Date ( ) ,
316
+ maxAttempts : body . options ?. maxAttempts ,
317
+ taskEventStore : store ,
318
+ ttl,
319
+ tags,
320
+ oneTimeUseToken : options . oneTimeUseToken ,
321
+ parentTaskRunId : parentRun ?. id ,
322
+ rootTaskRunId : parentRun ?. rootTaskRunId ?? parentRun ?. id ,
323
+ batch : options ?. batchId
324
+ ? {
325
+ id : options . batchId ,
326
+ index : options . batchIndex ?? 0 ,
327
+ }
328
+ : undefined ,
329
+ resumeParentOnCompletion : body . options ?. resumeParentOnCompletion ,
330
+ depth,
331
+ metadata : metadataPacket ?. data ,
332
+ metadataType : metadataPacket ?. dataType ,
333
+ seedMetadata : metadataPacket ?. data ,
334
+ seedMetadataType : metadataPacket ?. dataType ,
335
+ maxDurationInSeconds : body . options ?. maxDuration
336
+ ? clampMaxDuration ( body . options . maxDuration )
337
+ : undefined ,
338
+ machine : body . options ?. machine ,
339
+ priorityMs : body . options ?. priority ? body . options . priority * 1_000 : undefined ,
340
+ queueTimestamp :
341
+ options . queueTimestamp ??
342
+ ( parentRun && body . options ?. resumeParentOnCompletion
343
+ ? parentRun . queueTimestamp ?? undefined
344
+ : undefined ) ,
345
+ scheduleId : options . scheduleId ,
346
+ scheduleInstanceId : options . scheduleInstanceId ,
347
+ createdAt : options . overrideCreatedAt ,
348
+ bulkActionId : body . options ?. bulkActionId ,
349
+ planType,
350
+ } ,
351
+ this . prisma
352
+ ) ;
353
+
354
+ const error = taskRun . error ? TaskRunError . parse ( taskRun . error ) : undefined ;
355
+
356
+ if ( error ) {
357
+ event . failWithError ( error ) ;
358
+ }
359
+
360
+ return { run : taskRun , error, isCached : false } ;
354
361
}
362
+ ) ;
355
363
356
- return { run : taskRun , error, isCached : false } ;
364
+ if ( result ?. error ) {
365
+ throw new ServiceValidationError (
366
+ taskRunErrorToString ( taskRunErrorEnhancer ( result . error ) )
367
+ ) ;
357
368
}
358
- ) ;
359
369
360
- if ( result ?. error ) {
361
- throw new ServiceValidationError (
362
- taskRunErrorToString ( taskRunErrorEnhancer ( result . error ) )
363
- ) ;
370
+ return result ;
364
371
}
365
-
366
- return result ;
367
- } ) ;
372
+ ) ;
368
373
} catch ( error ) {
369
374
if ( error instanceof RunDuplicateIdempotencyKeyError ) {
370
375
//retry calling this function, because this time it will return the idempotent run
0 commit comments