Skip to content

Commit 1076662

Browse files
committed
re-implement on-error handling
1 parent f64ad94 commit 1076662

File tree

4 files changed

+225
-147
lines changed

4 files changed

+225
-147
lines changed

definition/pipelines.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type PipelineDef struct {
8080
// - failedTaskError: Error message of the failed task
8181
// - failedTaskStdout: Stdout of the failed task
8282
// - failedTaskStderr: Stderr of the failed task
83-
OnError OnErrorTaskDef `yaml:"onError"`
83+
OnError *OnErrorTaskDef `yaml:"onError"`
8484

8585
// SourcePath stores the source path where the pipeline was defined
8686
SourcePath string

prunner.go

Lines changed: 160 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type PipelineRunner struct {
4242
// externally, call requestPersist()
4343
persistRequests chan struct{}
4444

45-
// Mutex for reading or writing jobs and job state
45+
// Mutex for reading or writing pipeline definitions (defs), jobs and job state
4646
mx sync.RWMutex
4747
createTaskRunner func(j *PipelineJob) taskctl.Runner
4848

@@ -104,7 +104,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
104104
// It can be scheduled (in the waitListByPipeline of PipelineRunner),
105105
// or currently running (jobsByID / jobsByPipeline in PipelineRunner).
106106
type PipelineJob struct {
107-
ID uuid.UUID
107+
ID uuid.UUID
108+
// Identifier of the pipeline (from the YAML file)
108109
Pipeline string
109110
Env map[string]string
110111
Variables map[string]interface{}
@@ -195,6 +196,10 @@ var ErrJobNotFound = errors.New("job not found")
195196
var errJobAlreadyCompleted = errors.New("job is already completed")
196197
var ErrShuttingDown = errors.New("runner is shutting down")
197198

199+
// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it.
200+
// "pipeline" is the pipeline ID from the YAML file.
201+
//
202+
// the returned PipelineJob is the individual execution context.
198203
func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) {
199204
r.mx.Lock()
200205
defer r.mx.Unlock()
@@ -399,11 +404,163 @@ func (r *PipelineRunner) startJob(job *PipelineJob) {
399404
go func() {
400405
defer r.wg.Done()
401406
lastErr := job.sched.Schedule(graph)
407+
if lastErr != nil {
408+
r.RunJobErrorHandler(job)
409+
}
402410
r.JobCompleted(job.ID, lastErr)
403411
}()
404412
}
413+
func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) {
414+
errorGraph, err := r.buildErrorGraph(job)
415+
if err != nil {
416+
log.
417+
WithError(err).
418+
WithField("jobID", job.ID).
419+
WithField("pipeline", job.Pipeline).
420+
Error("Failed to build error pipeline graph")
421+
// At this point, an error with the error handling happened - duh...
422+
// Nothing we can do at this point.
423+
return
424+
}
425+
426+
// if errorGraph is nil (and no error); no error handling configured for task.
427+
if errorGraph != nil {
428+
// re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down
429+
// if ContinueRunningTasksAfterFailure == false)
430+
r.mx.Lock()
431+
r.initScheduler(job)
432+
r.mx.Unlock()
433+
434+
err = job.sched.Schedule(errorGraph)
435+
436+
if err != nil {
437+
log.
438+
WithError(err).
439+
WithField("jobID", job.ID).
440+
WithField("pipeline", job.Pipeline).
441+
Error("Failed to run error handling for job")
442+
} else {
443+
log.
444+
WithField("jobID", job.ID).
445+
WithField("pipeline", job.Pipeline).
446+
Info("error handling completed")
447+
}
448+
}
449+
}
450+
451+
func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) {
452+
r.mx.RLock()
453+
defer r.mx.RUnlock()
454+
pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline]
455+
if !pipelineDefExists {
456+
return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline)
457+
}
458+
onErrorTaskDef := pipelineDef.OnError
459+
if onErrorTaskDef == nil {
460+
// no error, but no error handling configured
461+
return nil, nil
462+
}
463+
464+
// we assume the 1st failed task (by end date) is the root cause, because this triggered a cascading abort then.
465+
failedTask := findFirstFailedTaskByEndDate(job.Tasks)
466+
467+
failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout")
468+
failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr")
469+
470+
onErrorVariables := make(map[string]interface{})
471+
for key, value := range job.Variables {
472+
onErrorVariables[key] = value
473+
}
474+
// TODO: find first failed task (by End Date)
475+
476+
if failedTask != nil {
477+
onErrorVariables["failedTaskName"] = failedTask.Name
478+
onErrorVariables["failedTaskExitCode"] = failedTask.ExitCode
479+
onErrorVariables["failedTaskError"] = failedTask.Error
480+
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
481+
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)
482+
} else {
483+
onErrorVariables["failedTaskName"] = "task_not_identified_should_not_happen"
484+
onErrorVariables["failedTaskExitCode"] = "99"
485+
onErrorVariables["failedTaskError"] = "task_not_identified_should_not_happen"
486+
onErrorVariables["failedTaskStdout"] = "task_not_identified_should_not_happen"
487+
onErrorVariables["failedTaskStderr"] = "task_not_identified_should_not_happen"
488+
}
489+
490+
onErrorJobTask := jobTask{
491+
TaskDef: definition.TaskDef{
492+
Script: onErrorTaskDef.Script,
493+
// AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log)
494+
AllowFailure: false,
495+
Env: onErrorTaskDef.Env,
496+
},
497+
Name: OnErrorTaskName,
498+
Status: toStatus(scheduler.StatusWaiting),
499+
}
500+
job.Tasks = append(job.Tasks, onErrorJobTask)
501+
502+
return buildPipelineGraph(job.ID, jobTasks{onErrorJobTask}, onErrorVariables)
503+
}
504+
505+
func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTask, outputName string) []byte {
506+
if task == nil || job == nil {
507+
return []byte(nil)
508+
}
509+
510+
rc, err := r.outputStore.Reader(job.ID.String(), task.Name, outputName)
511+
if err != nil {
512+
log.
513+
WithField("component", "runner").
514+
WithField("jobID", job.ID.String()).
515+
WithField("pipeline", job.Pipeline).
516+
WithField("failedTaskName", task.Name).
517+
WithField("outputName", outputName).
518+
WithError(err).
519+
Debug("Could not create stderrReader for failed task")
520+
return []byte(nil)
521+
} else {
522+
defer func(rc io.ReadCloser) {
523+
_ = rc.Close()
524+
}(rc)
525+
outputAsBytes, err := io.ReadAll(rc)
526+
if err != nil {
527+
log.
528+
WithField("component", "runner").
529+
WithField("jobID", job.ID.String()).
530+
WithField("pipeline", job.Pipeline).
531+
WithField("failedTaskName", task.Name).
532+
WithField("outputName", outputName).
533+
WithError(err).
534+
Debug("Could not read output of task")
535+
}
536+
537+
return outputAsBytes
538+
}
539+
540+
}
405541

406-
// HandleTaskChange will be called when the task state changes in the task runner
542+
// FindFirstFailedTaskByEndDate returns the first failed task ordered by End Date
543+
// A task is considered failed if it has errored or has a non-zero exit code
544+
func findFirstFailedTaskByEndDate(tasks jobTasks) *jobTask {
545+
var firstFailedTask *jobTask
546+
547+
for i := range tasks {
548+
task := &tasks[i]
549+
550+
// Check if the task failed (has an error or non-zero exit code)
551+
if task.Errored {
552+
// If this is our first failed task or this one ended earlier than our current earliest
553+
if firstFailedTask == nil || task.End.Before(*firstFailedTask.End) {
554+
firstFailedTask = task
555+
}
556+
}
557+
}
558+
559+
return firstFailedTask
560+
}
561+
562+
// HandleTaskChange will be called when the task state changes in the task runner (taskctl)
563+
// it is short-lived and updates our JobTask state accordingly.
407564
func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
408565
r.mx.Lock()
409566
defer r.mx.Unlock()
@@ -437,12 +594,6 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
437594
// Use internal cancel since we already have a lock on the mutex
438595
_ = r.cancelJobInternal(jobID)
439596
}
440-
441-
if found && len(pipelineDef.OnError.Script) > 0 {
442-
// we errored; and there is an onError script defined for the
443-
// current pipeline. So let's run it.
444-
r.runOnErrorScript(t, j, pipelineDef.OnError)
445-
}
446597
}
447598

448599
r.requestPersist()
@@ -475,141 +626,6 @@ func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) {
475626

476627
const OnErrorTaskName = "on_error"
477628

478-
// runOnErrorScript is responsible for running a special "on_error" script in response to an error in the pipeline.
479-
// It exposes variables containing information about the errored task.
480-
//
481-
// The method is triggered with the errored Task t, belonging to pipelineJob j; and pipelineDev
482-
func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorTaskDef definition.OnErrorTaskDef) {
483-
log.
484-
WithField("component", "runner").
485-
WithField("jobID", j.ID.String()).
486-
WithField("pipeline", j.Pipeline).
487-
WithField("failedTaskName", t.Name).
488-
Debug("Triggering onError Script because of task failure")
489-
490-
var failedTaskStdout []byte
491-
rc, err := r.outputStore.Reader(j.ID.String(), t.Name, "stdout")
492-
if err != nil {
493-
log.
494-
WithField("component", "runner").
495-
WithField("jobID", j.ID.String()).
496-
WithField("pipeline", j.Pipeline).
497-
WithField("failedTaskName", t.Name).
498-
WithError(err).
499-
Warn("Could not create stdout reader for failed task")
500-
} else {
501-
defer func(rc io.ReadCloser) {
502-
_ = rc.Close()
503-
}(rc)
504-
failedTaskStdout, err = io.ReadAll(rc)
505-
if err != nil {
506-
log.
507-
WithField("component", "runner").
508-
WithField("jobID", j.ID.String()).
509-
WithField("pipeline", j.Pipeline).
510-
WithField("failedTaskName", t.Name).
511-
WithError(err).
512-
Warn("Could not read stdout of failed task")
513-
}
514-
}
515-
516-
var failedTaskStderr []byte
517-
rc, err = r.outputStore.Reader(j.ID.String(), t.Name, "stderr")
518-
if err != nil {
519-
log.
520-
WithField("component", "runner").
521-
WithField("jobID", j.ID.String()).
522-
WithField("pipeline", j.Pipeline).
523-
WithField("failedTaskName", t.Name).
524-
WithError(err).
525-
Debug("Could not create stderrReader for failed task")
526-
} else {
527-
defer func(rc io.ReadCloser) {
528-
_ = rc.Close()
529-
}(rc)
530-
failedTaskStderr, err = io.ReadAll(rc)
531-
if err != nil {
532-
log.
533-
WithField("component", "runner").
534-
WithField("jobID", j.ID.String()).
535-
WithField("pipeline", j.Pipeline).
536-
WithField("failedTaskName", t.Name).
537-
WithError(err).
538-
Debug("Could not read stderr of failed task")
539-
}
540-
}
541-
542-
onErrorVariables := make(map[string]interface{})
543-
for key, value := range j.Variables {
544-
onErrorVariables[key] = value
545-
}
546-
onErrorVariables["failedTaskName"] = t.Name
547-
onErrorVariables["failedTaskExitCode"] = t.ExitCode
548-
onErrorVariables["failedTaskError"] = t.Error
549-
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
550-
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)
551-
552-
onErrorJobTask := jobTask{
553-
TaskDef: definition.TaskDef{
554-
Script: onErrorTaskDef.Script,
555-
// AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log)
556-
AllowFailure: false,
557-
Env: onErrorTaskDef.Env,
558-
},
559-
Name: OnErrorTaskName,
560-
Status: toStatus(scheduler.StatusWaiting),
561-
}
562-
563-
// store on task list; so that it appears in pipeline and UI etc
564-
j.Tasks = append(j.Tasks, onErrorJobTask)
565-
566-
onErrorGraph, err := buildPipelineGraph(j.ID, jobTasks{onErrorJobTask}, onErrorVariables)
567-
if err != nil {
568-
log.
569-
WithError(err).
570-
WithField("jobID", j.ID).
571-
WithField("pipeline", j.Pipeline).
572-
Error("Failed to build onError pipeline graph")
573-
onErrorJobTask.Error = err
574-
onErrorJobTask.Errored = true
575-
576-
// the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to
577-
// store it again after modifying it.
578-
j.Tasks[len(j.Tasks)-1] = onErrorJobTask
579-
return
580-
}
581-
582-
// we use a detached taskRunner and scheduler to run the onError task, to
583-
// run synchronously (as we are already in an async goroutine here), won't have any cycles,
584-
// and to simplify the code.
585-
taskRunner := r.createTaskRunner(j)
586-
sched := taskctl.NewScheduler(taskRunner)
587-
588-
// Now, actually run the job synchronously
589-
lastErr := sched.Schedule(onErrorGraph)
590-
591-
// Update job status as with normal jobs
592-
onErrorJobTask.Status = toStatus(onErrorGraph.Nodes()[OnErrorTaskName].ReadStatus())
593-
updateJobTaskStateFromTask(&onErrorJobTask, onErrorGraph.Nodes()[OnErrorTaskName].Task)
594-
595-
if lastErr != nil {
596-
log.
597-
WithError(err).
598-
WithField("jobID", j.ID).
599-
WithField("pipeline", j.Pipeline).
600-
Error("Error running the onError handler")
601-
} else {
602-
log.
603-
WithField("jobID", j.ID).
604-
WithField("pipeline", j.Pipeline).
605-
Debug("Successfully ran the onError handler")
606-
}
607-
608-
// the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to
609-
// store it again after modifying it.
610-
j.Tasks[len(j.Tasks)-1] = onErrorJobTask
611-
}
612-
613629
// HandleStageChange will be called when the stage state changes in the scheduler
614630
func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) {
615631
r.mx.Lock()

0 commit comments

Comments
 (0)