@@ -2,23 +2,24 @@ import {
22 CoordinatorToPlatformMessages ,
33 TaskRunExecution ,
44 TaskRunExecutionResult ,
5- WaitReason ,
65} from "@trigger.dev/core/v3" ;
76import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket" ;
87import { $transaction , PrismaClientOrTransaction } from "~/db.server" ;
98import { logger } from "~/services/logger.server" ;
109import { marqs } from "~/v3/marqs/index.server" ;
1110import { socketIo } from "../handleSocketIo.server" ;
12- import { SharedQueueMessageBody , sharedQueueTasks } from "../marqs/sharedQueueConsumer.server" ;
11+ import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server" ;
1312import { BaseService } from "./baseService.server" ;
1413import { TaskRunAttempt } from "@trigger.dev/database" ;
1514import { isFinalRunStatus } from "../taskStatus" ;
1615
1716export class ResumeAttemptService extends BaseService {
17+ private _logger = logger ;
18+
1819 public async call (
1920 params : InferSocketMessageSchema < typeof CoordinatorToPlatformMessages , "READY_FOR_RESUME" >
2021 ) : Promise < void > {
21- logger . debug ( `ResumeAttemptService.call()` , params ) ;
22+ this . _logger . debug ( `ResumeAttemptService.call()` , params ) ;
2223
2324 await $transaction ( this . _prisma , async ( tx ) => {
2425 const attempt = await tx . taskRunAttempt . findUnique ( {
@@ -77,27 +78,26 @@ export class ResumeAttemptService extends BaseService {
7778 } ) ;
7879
7980 if ( ! attempt ) {
80- logger . error ( "Could not find attempt" , { attemptFriendlyId : params . attemptFriendlyId } ) ;
81+ this . _logger . error ( "Could not find attempt" , params ) ;
8182 return ;
8283 }
8384
85+ this . _logger = logger . child ( {
86+ attemptId : attempt . id ,
87+ attemptFriendlyId : attempt . friendlyId ,
88+ taskRun : attempt . taskRun ,
89+ } ) ;
90+
8491 if ( isFinalRunStatus ( attempt . taskRun . status ) ) {
85- logger . error ( "Run is not resumable" , {
86- attemptId : attempt . id ,
87- runId : attempt . taskRunId ,
88- status : attempt . taskRun . status ,
89- } ) ;
92+ this . _logger . error ( "Run is not resumable" ) ;
9093 return ;
9194 }
9295
9396 let completedAttemptIds : string [ ] = [ ] ;
9497
9598 switch ( params . type ) {
9699 case "WAIT_FOR_DURATION" : {
97- logger . debug ( "Sending duration wait resume message" , {
98- attemptId : attempt . id ,
99- attemptFriendlyId : params . attemptFriendlyId ,
100- } ) ;
100+ this . _logger . debug ( "Sending duration wait resume message" ) ;
101101
102102 await this . #setPostResumeStatuses( attempt , tx ) ;
103103
@@ -114,13 +114,13 @@ export class ResumeAttemptService extends BaseService {
114114 const dependentAttempt = attempt . dependencies [ 0 ] . taskRun . attempts [ 0 ] ;
115115
116116 if ( ! dependentAttempt ) {
117- logger . error ( "No dependent attempt" , { attemptId : attempt . id } ) ;
117+ this . _logger . error ( "No dependent attempt" ) ;
118118 return ;
119119 }
120120
121121 completedAttemptIds = [ dependentAttempt . id ] ;
122122 } else {
123- logger . error ( "No task dependency" , { attemptId : attempt . id } ) ;
123+ this . _logger . error ( "No task dependency" ) ;
124124 return ;
125125 }
126126
@@ -134,13 +134,13 @@ export class ResumeAttemptService extends BaseService {
134134 const dependentBatchItems = attempt . batchDependencies [ 0 ] . items ;
135135
136136 if ( ! dependentBatchItems ) {
137- logger . error ( "No dependent batch items" , { attemptId : attempt . id } ) ;
137+ this . _logger . error ( "No dependent batch items" ) ;
138138 return ;
139139 }
140140
141141 completedAttemptIds = dependentBatchItems . map ( ( item ) => item . taskRun . attempts [ 0 ] ?. id ) ;
142142 } else {
143- logger . error ( "No batch dependency" , { attemptId : attempt . id } ) ;
143+ this . _logger . error ( "No batch dependency" ) ;
144144 return ;
145145 }
146146
@@ -161,7 +161,7 @@ export class ResumeAttemptService extends BaseService {
161161 tx : PrismaClientOrTransaction
162162 ) {
163163 if ( completedAttemptIds . length === 0 ) {
164- logger . error ( "No completed attempt IDs" , { attemptId : attempt . id } ) ;
164+ this . _logger . error ( "No completed attempt IDs" ) ;
165165 return ;
166166 }
167167
@@ -184,23 +184,23 @@ export class ResumeAttemptService extends BaseService {
184184 } ) ;
185185
186186 if ( ! completedAttempt ) {
187- logger . error ( "Completed attempt not found" , {
188- attemptId : attempt . id ,
189- completedAttemptId,
190- } ) ;
187+ this . _logger . error ( "Completed attempt not found" , { completedAttemptId } ) ;
191188 await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
192189 return ;
193190 }
194191
192+ const logger = this . _logger . child ( {
193+ completedAttemptId : completedAttempt . id ,
194+ completedAttemptFriendlyId : completedAttempt . friendlyId ,
195+ completedRunId : completedAttempt . taskRunId ,
196+ } ) ;
197+
195198 const completion = await sharedQueueTasks . getCompletionPayloadFromAttempt (
196199 completedAttempt . id
197200 ) ;
198201
199202 if ( ! completion ) {
200- logger . error ( "Failed to get completion payload" , {
201- attemptId : attempt . id ,
202- completedAttemptId,
203- } ) ;
203+ logger . error ( "Failed to get completion payload" ) ;
204204 await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
205205 return ;
206206 }
@@ -212,10 +212,7 @@ export class ResumeAttemptService extends BaseService {
212212 ) ;
213213
214214 if ( ! executionPayload ) {
215- logger . error ( "Failed to get execution payload" , {
216- attemptId : attempt . id ,
217- completedAttemptId,
218- } ) ;
215+ logger . error ( "Failed to get execution payload" ) ;
219216 await marqs ?. acknowledgeMessage ( attempt . taskRunId ) ;
220217 return ;
221218 }
0 commit comments