1- import { timeoutError } from "@trigger.dev/core/v3" ;
1+ import { timeoutError , tryCatch } from "@trigger.dev/core/v3" ;
22import { WaitpointId } from "@trigger.dev/core/v3/isomorphic" ;
33import {
44 $transaction ,
@@ -66,65 +66,51 @@ export class WaitpointSystem {
6666 isError : boolean ;
6767 } ;
6868 } ) : Promise < Waitpoint > {
69- const result = await $transaction (
70- this . $ . prisma ,
71- async ( tx ) => {
72- // 1. Find the TaskRuns blocked by this waitpoint
73- const affectedTaskRuns = await tx . taskRunWaitpoint . findMany ( {
74- where : { waitpointId : id } ,
75- select : { taskRunId : true , spanIdToComplete : true , createdAt : true } ,
76- } ) ;
69+ // 1. Find the TaskRuns blocked by this waitpoint
70+ const affectedTaskRuns = await this . $ . prisma . taskRunWaitpoint . findMany ( {
71+ where : { waitpointId : id } ,
72+ select : { taskRunId : true , spanIdToComplete : true , createdAt : true } ,
73+ } ) ;
7774
78- if ( affectedTaskRuns . length === 0 ) {
79- this . $ . logger . warn ( `completeWaitpoint: No TaskRunWaitpoints found for waitpoint` , {
80- waitpointId : id ,
81- } ) ;
82- }
75+ if ( affectedTaskRuns . length === 0 ) {
76+ this . $ . logger . debug ( `completeWaitpoint: No TaskRunWaitpoints found for waitpoint` , {
77+ waitpointId : id ,
78+ } ) ;
79+ }
8380
84- // 2. Update the waitpoint to completed (only if it's pending)
85- let waitpoint : Waitpoint | null = null ;
86- try {
87- waitpoint = await tx . waitpoint . update ( {
88- where : { id, status : "PENDING" } ,
89- data : {
90- status : "COMPLETED" ,
91- completedAt : new Date ( ) ,
92- output : output ?. value ,
93- outputType : output ?. type ,
94- outputIsError : output ?. isError ,
95- } ,
96- } ) ;
97- } catch ( error ) {
98- if ( error instanceof Prisma . PrismaClientKnownRequestError && error . code === "P2025" ) {
99- waitpoint = await tx . waitpoint . findFirst ( {
100- where : { id } ,
101- } ) ;
102- } else {
103- this . $ . logger . log ( "completeWaitpoint: error updating waitpoint:" , { error } ) ;
104- throw error ;
105- }
106- }
81+ let [ waitpointError , waitpoint ] = await tryCatch (
82+ this . $ . prisma . waitpoint . update ( {
83+ where : { id, status : "PENDING" } ,
84+ data : {
85+ status : "COMPLETED" ,
86+ completedAt : new Date ( ) ,
87+ output : output ?. value ,
88+ outputType : output ?. type ,
89+ outputIsError : output ?. isError ,
90+ } ,
91+ } )
92+ ) ;
10793
108- return { waitpoint, affectedTaskRuns } ;
109- } ,
110- ( error ) => {
111- this . $ . logger . error ( `completeWaitpoint: Error completing waitpoint ${ id } , retrying` , {
112- error,
94+ if ( waitpointError ) {
95+ if (
96+ waitpointError instanceof Prisma . PrismaClientKnownRequestError &&
97+ waitpointError . code === "P2025"
98+ ) {
99+ waitpoint = await this . $ . prisma . waitpoint . findFirst ( {
100+ where : { id } ,
113101 } ) ;
114- throw error ;
102+ } else {
103+ this . $ . logger . log ( "completeWaitpoint: error updating waitpoint:" , { waitpointError } ) ;
104+ throw waitpointError ;
115105 }
116- ) ;
117-
118- if ( ! result ) {
119- throw new Error ( `Waitpoint couldn't be updated` ) ;
120106 }
121107
122- if ( ! result . waitpoint ) {
108+ if ( ! waitpoint ) {
123109 throw new Error ( `Waitpoint ${ id } not found` ) ;
124110 }
125111
126112 //schedule trying to continue the runs
127- for ( const run of result . affectedTaskRuns ) {
113+ for ( const run of affectedTaskRuns ) {
128114 await this . $ . worker . enqueue ( {
129115 //this will debounce the call
130116 id : `continueRunIfUnblocked:${ run . taskRunId } ` ,
@@ -148,7 +134,7 @@ export class WaitpointSystem {
148134 }
149135 }
150136
151- return result . waitpoint ;
137+ return waitpoint ;
152138 }
153139
154140 /**
0 commit comments