Skip to content

Commit 0725460

Browse files
Merge pull request #255 from wking/canceled-workers-are-errors
Bug 1763821: pkg/payload/task_graph: Canceling the task graph partway though is an error even if no tasks fail
2 parents 56c6a51 + eaa3d19 commit 0725460

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

pkg/payload/task_graph.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,5 +612,9 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
612612
if len(errs) > 0 {
613613
return errs
614614
}
615+
// if the context was cancelled, we may have unfinished work
616+
if err := ctx.Err(); err != nil {
617+
return []error{err}
618+
}
615619
return nil
616620
}

pkg/payload/task_graph_test.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -721,12 +721,14 @@ func TestRunGraph(t *testing.T) {
721721
wantErrs []string
722722
}{
723723
{
724+
name: "tasks executed in order",
724725
nodes: []*TaskNode{
725726
{Tasks: tasks("a", "b")},
726727
},
727728
order: []string{"a", "b"},
728729
},
729730
{
731+
name: "nodes executed after dependencies",
730732
nodes: []*TaskNode{
731733
{Tasks: tasks("c"), In: []int{3}},
732734
{Tasks: tasks("d", "e"), In: []int{3}},
@@ -756,6 +758,7 @@ func TestRunGraph(t *testing.T) {
756758
},
757759
},
758760
{
761+
name: "task error interrupts node processing",
759762
nodes: []*TaskNode{
760763
{Tasks: tasks("c"), In: []int{2}},
761764
{Tasks: tasks("d"), In: []int{2}, Out: []int{3}},
@@ -781,6 +784,7 @@ func TestRunGraph(t *testing.T) {
781784
},
782785
},
783786
{
787+
name: "mid-task cancellation error interrupts node processing",
784788
nodes: []*TaskNode{
785789
{Tasks: tasks("c"), In: []int{2}},
786790
{Tasks: tasks("d"), In: []int{2}, Out: []int{3}},
@@ -796,15 +800,15 @@ func TestRunGraph(t *testing.T) {
796800
case <-time.After(time.Second):
797801
t.Fatalf("expected context")
798802
case <-ctx.Done():
799-
t.Logf("got cancelled context")
800-
return fmt.Errorf("cancelled")
803+
t.Logf("got canceled context")
804+
return ctx.Err()
801805
}
802806
return fmt.Errorf("error A")
803807
}
804808
return nil
805809
},
806810
want: []string{"a", "b", "c"},
807-
wantErrs: []string{"cancelled"},
811+
wantErrs: []string{"context canceled"},
808812
invariants: func(t *testing.T, got []string) {
809813
for _, s := range got {
810814
if s == "e" {
@@ -814,6 +818,7 @@ func TestRunGraph(t *testing.T) {
814818
},
815819
},
816820
{
821+
name: "task errors in parallel nodes both reported",
817822
nodes: []*TaskNode{
818823
{Tasks: tasks("a"), Out: []int{1}},
819824
{Tasks: tasks("b"), In: []int{0}, Out: []int{2, 4, 8}},
@@ -839,6 +844,26 @@ func TestRunGraph(t *testing.T) {
839844
want: []string{"a", "b", "d1", "d2", "d3"},
840845
wantErrs: []string{"error - c1", "error - f"},
841846
},
847+
{
848+
name: "cancelation without task errors is reported",
849+
nodes: []*TaskNode{
850+
{Tasks: tasks("a"), Out: []int{1}},
851+
{Tasks: tasks("b"), In: []int{0}},
852+
},
853+
sleep: time.Millisecond,
854+
parallel: 1,
855+
errorOn: func(t *testing.T, name string, ctx context.Context, cancelFn func()) error {
856+
if name == "a" {
857+
cancelFn()
858+
time.Sleep(time.Second)
859+
return nil
860+
}
861+
t.Fatalf("task b should never run")
862+
return nil
863+
},
864+
want: []string{"a"},
865+
wantErrs: []string{"context canceled"},
866+
},
842867
}
843868
for _, tt := range tests {
844869
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)