11import { Attributes } from "@opentelemetry/api" ;
22import {
33 TaskRunContext ,
4+ TaskRunError ,
45 TaskRunErrorCodes ,
56 TaskRunExecution ,
67 TaskRunExecutionResult ,
78 TaskRunExecutionRetry ,
89 TaskRunFailedExecutionResult ,
910 TaskRunSuccessfulExecutionResult ,
11+ exceptionEventEnhancer ,
1012 flattenAttributes ,
13+ internalErrorFromUnexpectedExit ,
1114 sanitizeError ,
1215 shouldRetryError ,
1316 taskRunErrorEnhancer ,
@@ -233,7 +236,7 @@ export class CompleteAttemptService extends BaseService {
233236
234237 if ( ! executionRetry && shouldInfer ) {
235238 executionRetryInferred = true ;
236- executionRetry = await FailedTaskRunRetryHelper . getExecutionRetry ( {
239+ executionRetry = FailedTaskRunRetryHelper . getExecutionRetry ( {
237240 run : {
238241 ...taskRunAttempt . taskRun ,
239242 lockedBy : taskRunAttempt . backgroundWorkerTask ,
@@ -243,7 +246,47 @@ export class CompleteAttemptService extends BaseService {
243246 } ) ;
244247 }
245248
246- const retriableError = shouldRetryError ( taskRunErrorEnhancer ( completion . error ) ) ;
249+ let retriableError = shouldRetryError ( taskRunErrorEnhancer ( completion . error ) ) ;
250+ let isOOMRetry = false ;
251+
252+ //OOM errors should retry (if an OOM machine is specified)
253+ if ( isOOMError ( completion . error ) ) {
254+ const retryConfig = FailedTaskRunRetryHelper . getRetryConfig ( {
255+ run : {
256+ ...taskRunAttempt . taskRun ,
257+ lockedBy : taskRunAttempt . backgroundWorkerTask ,
258+ lockedToVersion : taskRunAttempt . backgroundWorker ,
259+ } ,
260+ execution,
261+ } ) ;
262+
263+ if (
264+ retryConfig ?. outOfMemory ?. machine &&
265+ retryConfig . outOfMemory . machine !== taskRunAttempt . taskRun . machinePreset
266+ ) {
267+ //we will retry
268+ isOOMRetry = true ;
269+ retriableError = true ;
270+ executionRetry = FailedTaskRunRetryHelper . getExecutionRetry ( {
271+ run : {
272+ ...taskRunAttempt . taskRun ,
273+ lockedBy : taskRunAttempt . backgroundWorkerTask ,
274+ lockedToVersion : taskRunAttempt . backgroundWorker ,
275+ } ,
276+ execution,
277+ } ) ;
278+
279+ //update the machine on the run
280+ await this . _prisma . taskRun . update ( {
281+ where : {
282+ id : taskRunAttempt . taskRunId ,
283+ } ,
284+ data : {
285+ machinePreset : retryConfig . outOfMemory . machine ,
286+ } ,
287+ } ) ;
288+ }
289+ }
247290
248291 if (
249292 retriableError &&
@@ -257,6 +300,7 @@ export class CompleteAttemptService extends BaseService {
257300 taskRunAttempt,
258301 environment,
259302 checkpoint,
303+ forceRequeue : isOOMRetry ,
260304 } ) ;
261305 }
262306
@@ -378,12 +422,14 @@ export class CompleteAttemptService extends BaseService {
378422 executionRetryInferred,
379423 checkpointEventId,
380424 supportsLazyAttempts,
425+ forceRequeue = false ,
381426 } : {
382427 run : TaskRun ;
383428 executionRetry : TaskRunExecutionRetry ;
384429 executionRetryInferred : boolean ;
385430 checkpointEventId ?: string ;
386431 supportsLazyAttempts : boolean ;
432+ forceRequeue ?: boolean ;
387433 } ) {
388434 const retryViaQueue = ( ) => {
389435 logger . debug ( "[CompleteAttemptService] Enqueuing retry attempt" , { runId : run . id } ) ;
@@ -434,6 +480,12 @@ export class CompleteAttemptService extends BaseService {
434480 return ;
435481 }
436482
483+ if ( forceRequeue ) {
484+ logger . debug ( "[CompleteAttemptService] Forcing retry via queue" , { runId : run . id } ) ;
485+ await retryViaQueue ( ) ;
486+ return ;
487+ }
488+
437489 // Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold
438490 if (
439491 ! this . opts . supportsRetryCheckpoints &&
@@ -466,13 +518,15 @@ export class CompleteAttemptService extends BaseService {
466518 taskRunAttempt,
467519 environment,
468520 checkpoint,
521+ forceRequeue = false ,
469522 } : {
470523 execution : TaskRunExecution ;
471524 executionRetry : TaskRunExecutionRetry ;
472525 executionRetryInferred : boolean ;
473526 taskRunAttempt : NonNullable < FoundAttempt > ;
474527 environment : AuthenticatedEnvironment ;
475528 checkpoint ?: CheckpointData ;
529+ forceRequeue ?: boolean ;
476530 } ) {
477531 const retryAt = new Date ( executionRetry . timestamp ) ;
478532
@@ -533,6 +587,7 @@ export class CompleteAttemptService extends BaseService {
533587 executionRetry,
534588 supportsLazyAttempts : taskRunAttempt . backgroundWorker . supportsLazyAttempts ,
535589 executionRetryInferred,
590+ forceRequeue,
536591 } ) ;
537592
538593 return "RETRIED" ;
@@ -634,3 +689,12 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId:
634689 } ,
635690 } ) ;
636691}
692+
693+ function isOOMError ( error : TaskRunError ) {
694+ if ( error . type !== "INTERNAL_ERROR" ) return false ;
695+ if ( error . code === "TASK_PROCESS_OOM_KILLED" || error . code === "TASK_PROCESS_MAYBE_OOM_KILLED" ) {
696+ return true ;
697+ }
698+
699+ return false ;
700+ }
0 commit comments