Skip to content

Commit eaa3d19

Browse files
committed
pkg/payload/task_graph: RunGraph error when canceled
The RunGraph implementation was unchanged since it landed in cb4e037 (payload: Create a task graph that can split a payload into chunks, 2019-01-17, #88), with the exception of later logging and c2ac20f (status: Report the operators that have not yet deployed, 2019-04-09, #158) with the adjusted return type. The old code launched a goroutine for the pushing/reaping, which was an unecessary, and made error reporting on any outstanding tasks more complicated. I'd like to drop the goroutine, but Clayton is not comfortable with backporting that large a change to older releases [1]. And I'd like to be able to return errors like: 1 incomplete task nodes, beginning with b but Clayton thinks these are just "took too long, but we're still making progress" and that they'll resolve on their own in the next attempt or few, and that they're not actual deadlocks where you'd want a better fingerprint to pin down the node(s) that were locking [2]. This commit ensures that when we are canceled we return an error, and it does none of the refactoring we'd need to be able to say whether we had unprocessed nodes (for late cancels, it's possible that we could return "I was canceled" even if we had successfully pushed and reaped all the nodes). This should avoid situations like [3]: 2019-10-21T10:34:30.63940461Z I1021 10:34:30.639073 1 start.go:19] ClusterVersionOperator v1.0.0-106-g0725bd53-dirty ... 2019-10-21T10:34:31.132673574Z I1021 10:34:31.132635 1 sync_worker.go:453] Running sync quay.io/runcom/origin-release:v4.2-1196 (force=true) on generation 2 in state Updating at attempt 0 ... 2019-10-21T10:40:16.168632703Z I1021 10:40:16.168604 1 sync_worker.go:579] Running sync for customresourcedefinition "baremetalhosts.metal3.io" (101 of 432) 2019-10-21T10:40:16.18425522Z I1021 10:40:16.184220 1 task_graph.go:583] Canceled worker 0 2019-10-21T10:40:16.184381244Z I1021 10:40:16.184360 1 task_graph.go:583] Canceled worker 3 ... 2019-10-21T10:40:16.21772875Z I1021 10:40:16.217715 1 task_graph.go:603] Workers finished 2019-10-21T10:40:16.217777479Z I1021 10:40:16.217759 1 task_graph.go:611] Result of work: [] 2019-10-21T10:40:16.217864206Z I1021 10:40:16.217846 1 task_graph.go:539] Stopped graph walker due to cancel ... 2019-10-21T10:43:08.743798997Z I1021 10:43:08.743740 1 sync_worker.go:453] Running sync quay.io/runcom/origin-release:v4.2-1196 (force=true) on generation 2 in state Reconciling at attempt 0 ... where the CVO canceled some workers, saw that there are worker no errors, and decided "upgrade complete" despite never having attempted to push the bulk of its manifests. Without the task_graph.go changes in this commit, the new test fails with: $ go test -run TestRunGraph ./pkg/payload/ --- FAIL: TestRunGraph (1.03s) --- FAIL: TestRunGraph/cancelation_without_task_errors_is_reported (1.00s) task_graph_test.go:910: unexpected error: [] FAIL FAIL github.com/openshift/cluster-version-operator/pkg/payload 1.042s Also change "cancelled" -> "canceled" to match Go's docs [4] and name the other test cases. [1]: #255 (comment) [2]: #260 [3]: https://storage.googleapis.com/origin-ci-test/logs/release-openshift-origin-installer-e2e-aws-upgrade-4.1/754/artifacts/e2e-aws-upgrade/must-gather/registry-svc-ci-openshift-org-origin-4-1-sha256-f8c863ea08d64eea7b3a9ffbbde9c01ca90501afe6c0707e9c35f0ed7e92a9df/namespaces/openshift-cluster-version/pods/cluster-version-operator-5f5d465967-t57b2/cluster-version-operator/cluster-version-operator/logs/current.log [4]: https://golang.org/pkg/context/#pkg-overview
1 parent ae1f16b commit eaa3d19

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

pkg/payload/task_graph.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,5 +612,9 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
612612
if len(errs) > 0 {
613613
return errs
614614
}
615+
// if the context was cancelled, we may have unfinished work
616+
if err := ctx.Err(); err != nil {
617+
return []error{err}
618+
}
615619
return nil
616620
}

pkg/payload/task_graph_test.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -721,12 +721,14 @@ func TestRunGraph(t *testing.T) {
721721
wantErrs []string
722722
}{
723723
{
724+
name: "tasks executed in order",
724725
nodes: []*TaskNode{
725726
{Tasks: tasks("a", "b")},
726727
},
727728
order: []string{"a", "b"},
728729
},
729730
{
731+
name: "nodes executed after dependencies",
730732
nodes: []*TaskNode{
731733
{Tasks: tasks("c"), In: []int{3}},
732734
{Tasks: tasks("d", "e"), In: []int{3}},
@@ -756,6 +758,7 @@ func TestRunGraph(t *testing.T) {
756758
},
757759
},
758760
{
761+
name: "task error interrupts node processing",
759762
nodes: []*TaskNode{
760763
{Tasks: tasks("c"), In: []int{2}},
761764
{Tasks: tasks("d"), In: []int{2}, Out: []int{3}},
@@ -781,6 +784,7 @@ func TestRunGraph(t *testing.T) {
781784
},
782785
},
783786
{
787+
name: "mid-task cancellation error interrupts node processing",
784788
nodes: []*TaskNode{
785789
{Tasks: tasks("c"), In: []int{2}},
786790
{Tasks: tasks("d"), In: []int{2}, Out: []int{3}},
@@ -796,15 +800,15 @@ func TestRunGraph(t *testing.T) {
796800
case <-time.After(time.Second):
797801
t.Fatalf("expected context")
798802
case <-ctx.Done():
799-
t.Logf("got cancelled context")
800-
return fmt.Errorf("cancelled")
803+
t.Logf("got canceled context")
804+
return ctx.Err()
801805
}
802806
return fmt.Errorf("error A")
803807
}
804808
return nil
805809
},
806810
want: []string{"a", "b", "c"},
807-
wantErrs: []string{"cancelled"},
811+
wantErrs: []string{"context canceled"},
808812
invariants: func(t *testing.T, got []string) {
809813
for _, s := range got {
810814
if s == "e" {
@@ -814,6 +818,7 @@ func TestRunGraph(t *testing.T) {
814818
},
815819
},
816820
{
821+
name: "task errors in parallel nodes both reported",
817822
nodes: []*TaskNode{
818823
{Tasks: tasks("a"), Out: []int{1}},
819824
{Tasks: tasks("b"), In: []int{0}, Out: []int{2, 4, 8}},
@@ -839,6 +844,26 @@ func TestRunGraph(t *testing.T) {
839844
want: []string{"a", "b", "d1", "d2", "d3"},
840845
wantErrs: []string{"error - c1", "error - f"},
841846
},
847+
{
848+
name: "cancelation without task errors is reported",
849+
nodes: []*TaskNode{
850+
{Tasks: tasks("a"), Out: []int{1}},
851+
{Tasks: tasks("b"), In: []int{0}},
852+
},
853+
sleep: time.Millisecond,
854+
parallel: 1,
855+
errorOn: func(t *testing.T, name string, ctx context.Context, cancelFn func()) error {
856+
if name == "a" {
857+
cancelFn()
858+
time.Sleep(time.Second)
859+
return nil
860+
}
861+
t.Fatalf("task b should never run")
862+
return nil
863+
},
864+
want: []string{"a"},
865+
wantErrs: []string{"context canceled"},
866+
},
842867
}
843868
for _, tt := range tests {
844869
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)