@@ -41,6 +41,7 @@ type PipelineRunner struct {
4141 // persistRequests is for triggering saving-the-store, which is then handled asynchronously, at most every 3 seconds (see NewPipelineRunner)
4242 // externally, call requestPersist()
4343 persistRequests chan struct {}
44+ persistLoopDone chan struct {}
4445
4546 // Mutex for reading or writing pipeline definitions (defs), jobs and job state
4647 mx sync.RWMutex
@@ -50,13 +51,17 @@ type PipelineRunner struct {
5051 wg sync.WaitGroup
5152 // Flag if the runner is shutting down
5253 isShuttingDown bool
54+ // shutdownCancel is the cancel function for the shutdown context (will stop persist loop)
55+ shutdownCancel context.CancelFunc
5356
5457 // Poll interval for completed jobs for graceful shutdown
5558 ShutdownPollInterval time.Duration
5659}
5760
5861// NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running
5962func NewPipelineRunner (ctx context.Context , defs * definition.PipelinesDef , createTaskRunner func (j * PipelineJob ) taskctl.Runner , store store.DataStore , outputStore taskctl.OutputStore ) (* PipelineRunner , error ) {
63+ ctx , cancel := context .WithCancel (ctx )
64+
6065 pRunner := & PipelineRunner {
6166 defs : defs ,
6267 // jobsByID contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled.
@@ -69,6 +74,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
6974 outputStore : outputStore ,
7075 // Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking
7176 persistRequests : make (chan struct {}, 1 ),
77+ persistLoopDone : make (chan struct {}),
78+ shutdownCancel : cancel ,
7279 createTaskRunner : createTaskRunner ,
7380 ShutdownPollInterval : 3 * time .Second ,
7481 }
@@ -80,6 +87,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
8087 }
8188
8289 go func () {
90+ defer close (pRunner .persistLoopDone ) // Signal that the persist loop is done on shutdown
91+
8392 for {
8493 select {
8594 case <- ctx .Done ():
@@ -123,6 +132,8 @@ type PipelineJob struct {
123132 // Tasks is an in-memory representation with state of tasks, sorted by dependencies
124133 Tasks jobTasks
125134 LastError error
135+ // firstFailedTask is a reference to the first task that failed in this job
136+ firstFailedTask * jobTask
126137
127138 sched * taskctl.Scheduler
128139 taskRunner runner.Runner
@@ -411,7 +422,7 @@ func (r *PipelineRunner) startJob(job *PipelineJob) {
411422 }()
412423}
413424func (r * PipelineRunner ) RunJobErrorHandler (job * PipelineJob ) {
414- errorGraph , err := r .buildErrorGraph (job )
425+ errorGraph , err := r .BuildErrorGraph (job )
415426 if err != nil {
416427 log .
417428 WithError (err ).
@@ -450,9 +461,10 @@ func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) {
450461
451462const OnErrorTaskName = "on_error"
452463
453- func (r * PipelineRunner ) buildErrorGraph (job * PipelineJob ) (* scheduler.ExecutionGraph , error ) {
464+ func (r * PipelineRunner ) BuildErrorGraph (job * PipelineJob ) (* scheduler.ExecutionGraph , error ) {
454465 r .mx .RLock ()
455466 defer r .mx .RUnlock ()
467+
456468 pipelineDef , pipelineDefExists := r .defs .Pipelines [job .Pipeline ]
457469 if ! pipelineDefExists {
458470 return nil , fmt .Errorf ("pipeline definition not found for pipeline %s (should never happen)" , job .Pipeline )
@@ -463,8 +475,7 @@ func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.Execution
463475 return nil , nil
464476 }
465477
466- // we assume the 1st failed task (by end date) is the root cause, because this triggered a cascading abort then.
467- failedTask := findFirstFailedTaskByEndDate (job .Tasks )
478+ failedTask := job .firstFailedTask
468479
469480 failedTaskStdout := r .readTaskOutputBestEffort (job , failedTask , "stdout" )
470481 failedTaskStderr := r .readTaskOutputBestEffort (job , failedTask , "stderr" )
@@ -473,7 +484,6 @@ func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.Execution
473484 for key , value := range job .Variables {
474485 onErrorVariables [key ] = value
475486 }
476- // TODO: find first failed task (by End Date)
477487
478488 if failedTask != nil {
479489 onErrorVariables ["failedTaskName" ] = failedTask .Name
@@ -606,10 +616,9 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
606616 // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
607617 // if one task failed, and we want to kill the other tasks.
608618 if jt .Errored {
609- if jt .End == nil {
610- // Remember ending time in case of error (we need this to identify the correct onError hook)
611- now := time .Now ()
612- jt .End = & now
619+ if j .firstFailedTask == nil {
620+ // Remember the first failed task for later use in the error handling
621+ j .firstFailedTask = jt
613622 }
614623 pipelineDef , found := r .defs .Pipelines [j .Pipeline ]
615624 if found && ! pipelineDef .ContinueRunningTasksAfterFailure {
@@ -979,13 +988,16 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error {
979988 // Wait for all running jobs to have called JobCompleted
980989 r .wg .Wait ()
981990
982- // TODO This is not safe to do outside of the requestPersist loop, since we might have a save in progress. So we need to wait until the save loop is finished before calling SaveToStore.
991+ // Wait until the persist loop is done
992+ <- r .persistLoopDone
983993 // Do a final save to include the state of recently completed jobs
984994 r .SaveToStore ()
985995 }()
986996
987997 r .mx .Lock ()
988998 r .isShuttingDown = true
999+ r .shutdownCancel ()
1000+
9891001 // Cancel all jobs on wait list
9901002 for pipelineName , jobs := range r .waitListByPipeline {
9911003 for _ , job := range jobs {
0 commit comments