Skip to content

Commit 55ef3d3

Browse files
committed
pkg/payload/task_graph: Handle node pushing and result collection without a goroutine
The RunGraph implementation was unchanged since it landed in cb4e037 (payload: Create a task graph that can split a payload into chunks, 2019-01-17, #88), with the exception of later logging, c2ac20f (status: Report the operators that have not yet deployed, 2019-04-09, #158) with the adjusted return type, and eaa3d19 (pkg/payload/task_graph: RunGraph error when canceled, 2019-10-21, #255) with its "I was canceled" error backstop. The old code launched a goroutine for the pushing/reaping, which was an unecessary, and made error reporting on any outstanding tasks more complicated. Dropping this goroutine let me get rid of errCh which used to be used to pass errors back to the main goroutine. I also dropped the wg.Wait() goroutine, which used to be part of supporting the 'range errCh' used in the main goroutine to collect failed jobs. But because errCh was filled directly from the worker goroutines, nothing used to be collecting the completeness of graph coverage from the pushing/reaping goroutine. I've also restructured pushing/reaping to be more resistant to locking and spinning. The old implementation had a node-pushing loop which attempted non-blocking pushes, then a result-reaping loop, then some condition checking, then a blocking result reaping attempt (after "we did nothing this round, so we have to wait for more"). The idea with the blocking reap seems to have been that if the pusher/reaper didn't push anything (because workCh was full) and the earlier 'for len(completeCh) > 0' reaping didn't pull anything in (because workers were busy working), that the pusher/reaper should block until it collected a result, in the hopes that worker which returned the result would have cleared a job out of workCh to give the pusher/reaper space to push a new job with the next loop iteration. But if Go decides to give the pusher/reaper more time in the scheduler, it might attempt the next workCh push before the job gets around to being scheduled and popping from workCh. And then the pusher/reaper might trickle down to the blocking reap and wait for another worker (hopefully maxParallelism > 1) to return a result to unblock the pusher/reaper and give it another shot at a workCh push. During this time, the worker that returned earlier is idling with nothing to do. With this commit, when we have a next node to push, we have a single switch statement that blocks until we are either able to push the node or to reap a result. So there's no need for a non-blocking push, and no chance at spinning, although it does mean we need to recalculate the next node after each channel action. When we've been canceled, we stop pushing into workCh, but continue to reap from resultCh until we have no in-flight jobs left. And if we have nothing to push, and there's nothing in-flight to reap, we're obviously done, so that choice is a lot simpler now. I've dropped the "Waiting for workers to complete" log line, because wg.Wait() call should block for much less time now. And because the main RunGraph goroutine is doing the waiting, we no longer need the 'runTask, ok := <-workCh' read to protect against workCh being closed early. With the wg.Wait() now getting called after we have drained all in-flight jobs (previously we launched it immediately after launching workers), there is less call for the "Waiting for..." line. But the most externally noticeable change is that now, if we run without any failing jobs to give us errors, I'm filling in a new "incomplete task nodes" error so folks don't have to look in the CVO logs to see how far the CVO got before hanging. It also allows us to not return the "I was canceled" error in cases where the cancellation happened late enough that we were still able to successfully process the full graph. The unit change fills in a less-stubby Kubernetes object, avoiding: --- FAIL: TestRunGraph/cancelation_without_task_errors_is_reported (1.00s) task_graph_test.go:914: error 0 "1 incomplete task nodes, beginning with %!s(PANIC=String method: runtime error: invalid memory address or nil pointer dereference)" doesn't contain "1 incomplete task nodes, beginning with b" when Task.String calls Unstructured.GetName which explodes on the lack of expected Kubernetes-object metadata.
1 parent cb8241a commit 55ef3d3

File tree

2 files changed

+101
-141
lines changed

2 files changed

+101
-141
lines changed

pkg/payload/task_graph.go

Lines changed: 85 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ import (
55
"fmt"
66
"math/rand"
77
"regexp"
8-
"sort"
98
"strings"
109
"sync"
1110

12-
"k8s.io/klog"
13-
1411
"k8s.io/apimachinery/pkg/runtime/schema"
1512
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
"k8s.io/klog"
1614
)
1715

1816
// SplitOnJobs enforces the rule that any Job in the payload prevents reordering or parallelism (either before or after)
@@ -426,180 +424,128 @@ type runTasks struct {
426424
}
427425

428426
type taskStatus struct {
429-
index int
430-
success bool
427+
index int
428+
error error
431429
}
432430

433431
// RunGraph executes the provided graph in order and in parallel up to maxParallelism. It will not start
434432
// a new TaskNode until all of the prerequisites have completed. If fn returns an error, no dependencies
435433
// of that node will be executed, but other indepedent edges will continue executing.
436434
func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) []error {
437-
nestedCtx, cancelFn := context.WithCancel(ctx)
438-
defer cancelFn()
439-
440-
// This goroutine takes nodes from the graph as they are available (their prereq has completed) and
441-
// sends them to workCh. It uses completeCh to know that a previously dispatched item is complete.
442-
completeCh := make(chan taskStatus, maxParallelism)
443-
defer close(completeCh)
435+
submitted := make([]bool, len(graph.Nodes))
436+
results := make([]*taskStatus, len(graph.Nodes))
444437

445-
workCh := make(chan runTasks, maxParallelism)
446-
go func() {
447-
defer close(workCh)
448-
449-
// visited tracks nodes we have not sent (0), are currently
450-
// waiting for completion (1), or have completed (2,3)
451-
const (
452-
nodeNotVisited int = iota
453-
nodeWorking
454-
nodeFailed
455-
nodeComplete
456-
)
457-
visited := make([]int, len(graph.Nodes))
458-
canVisit := func(node *TaskNode) bool {
459-
for _, previous := range node.In {
460-
switch visited[previous] {
461-
case nodeFailed, nodeWorking, nodeNotVisited:
462-
return false
463-
}
438+
canVisit := func(node *TaskNode) bool {
439+
for _, previous := range node.In {
440+
if result := results[previous]; result == nil || result.error != nil {
441+
return false
464442
}
465-
return true
466443
}
444+
return true
445+
}
467446

468-
remaining := len(graph.Nodes)
469-
var inflight int
470-
for {
471-
found := 0
472-
473-
// walk the graph, filling the work queue
474-
for i := 0; i < len(visited); i++ {
475-
if visited[i] != nodeNotVisited {
476-
continue
477-
}
478-
if canVisit(graph.Nodes[i]) {
479-
select {
480-
case workCh <- runTasks{index: i, tasks: graph.Nodes[i].Tasks}:
481-
visited[i] = nodeWorking
482-
found++
483-
inflight++
484-
default:
485-
break
486-
}
487-
}
488-
}
489-
490-
// try to empty the done channel
491-
for len(completeCh) > 0 {
492-
finished := <-completeCh
493-
if finished.success {
494-
visited[finished.index] = nodeComplete
495-
} else {
496-
visited[finished.index] = nodeFailed
497-
}
498-
remaining--
499-
inflight--
500-
found++
501-
}
502-
503-
if found > 0 {
447+
getNextNode := func() int {
448+
for i, node := range graph.Nodes {
449+
if submitted[i] {
504450
continue
505451
}
506-
507-
// no more work to hand out
508-
if remaining == 0 {
509-
klog.V(4).Infof("Graph is complete")
510-
return
452+
if canVisit(node) {
453+
return i
511454
}
455+
}
512456

513-
// we walked the entire graph, there are still nodes remaining, but we're not waiting
514-
// for anything
515-
if inflight == 0 && found == 0 {
516-
klog.V(4).Infof("No more reachable nodes in graph, continue")
517-
break
518-
}
457+
return -1
458+
}
519459

520-
// we did nothing this round, so we have to wait for more
521-
finished, ok := <-completeCh
522-
if !ok {
523-
// we've been aborted
524-
klog.V(4).Infof("Stopped graph walker due to cancel")
525-
return
526-
}
527-
if finished.success {
528-
visited[finished.index] = nodeComplete
529-
} else {
530-
visited[finished.index] = nodeFailed
531-
}
532-
remaining--
533-
inflight--
534-
}
460+
// Tasks go out to the workers via workCh, and results come brack
461+
// from the workers via resultCh.
462+
workCh := make(chan runTasks, maxParallelism)
463+
defer close(workCh)
535464

536-
// take everything remaining and process in order
537-
var unreachable []*Task
538-
for i := 0; i < len(visited); i++ {
539-
if visited[i] == nodeNotVisited && canVisit(graph.Nodes[i]) {
540-
unreachable = append(unreachable, graph.Nodes[i].Tasks...)
541-
}
542-
}
543-
if len(unreachable) > 0 {
544-
sort.Slice(unreachable, func(i, j int) bool {
545-
a, b := unreachable[i], unreachable[j]
546-
return a.Index < b.Index
547-
})
548-
workCh <- runTasks{index: -1, tasks: unreachable}
549-
klog.V(4).Infof("Waiting for last tasks")
550-
<-completeCh
551-
}
552-
klog.V(4).Infof("No more work")
553-
}()
465+
resultCh := make(chan taskStatus, maxParallelism)
466+
defer close(resultCh)
467+
468+
nestedCtx, cancelFn := context.WithCancel(ctx)
469+
defer cancelFn()
554470

555-
errCh := make(chan error, maxParallelism)
556471
wg := sync.WaitGroup{}
557472
if maxParallelism < 1 {
558473
maxParallelism = 1
559474
}
560475
for i := 0; i < maxParallelism; i++ {
561476
wg.Add(1)
562-
go func(job int) {
477+
go func(ctx context.Context, job int) {
563478
defer utilruntime.HandleCrash()
564479
defer wg.Done()
565480
for {
566481
select {
567-
case <-nestedCtx.Done():
568-
klog.V(4).Infof("Canceled worker %d", job)
482+
case <-ctx.Done():
483+
klog.V(4).Infof("Canceled worker %d while waiting for work", job)
569484
return
570-
case runTask, ok := <-workCh:
571-
if !ok {
572-
klog.V(4).Infof("No more work for %d", job)
573-
return
574-
}
485+
case runTask := <-workCh:
575486
klog.V(4).Infof("Running %d on worker %d", runTask.index, job)
576-
err := fn(nestedCtx, runTask.tasks)
577-
completeCh <- taskStatus{index: runTask.index, success: err == nil}
578-
if err != nil {
579-
errCh <- err
580-
}
487+
err := fn(ctx, runTask.tasks)
488+
resultCh <- taskStatus{index: runTask.index, error: err}
581489
}
582490
}
583-
}(i)
491+
}(nestedCtx, i)
492+
}
493+
494+
var inflight int
495+
nextNode := getNextNode()
496+
done := false
497+
for !done {
498+
switch {
499+
case ctx.Err() == nil && nextNode >= 0: // push a task or collect a result
500+
select {
501+
case workCh <- runTasks{index: nextNode, tasks: graph.Nodes[nextNode].Tasks}:
502+
submitted[nextNode] = true
503+
inflight++
504+
case result := <-resultCh:
505+
results[result.index] = &result
506+
inflight--
507+
case <-ctx.Done():
508+
}
509+
case inflight > 0: // no work available to push; collect results
510+
result := <-resultCh
511+
results[result.index] = &result
512+
inflight--
513+
default: // no work to push and nothing in flight. We're done
514+
done = true
515+
}
516+
if !done {
517+
nextNode = getNextNode()
518+
}
584519
}
585-
go func() {
586-
klog.V(4).Infof("Waiting for workers to complete")
587-
wg.Wait()
588-
klog.V(4).Infof("Workers finished")
589-
close(errCh)
590-
}()
520+
521+
cancelFn()
522+
wg.Wait()
523+
klog.V(4).Infof("Workers finished")
591524

592525
var errs []error
593-
for err := range errCh {
594-
errs = append(errs, err)
526+
var firstIncompleteNode *TaskNode
527+
incompleteCount := 0
528+
for i, result := range results {
529+
if result == nil {
530+
if firstIncompleteNode == nil {
531+
firstIncompleteNode = graph.Nodes[i]
532+
}
533+
incompleteCount++
534+
} else if result.error != nil {
535+
errs = append(errs, result.error)
536+
}
537+
}
538+
539+
if len(errs) == 0 && firstIncompleteNode != nil {
540+
errs = append(errs, fmt.Errorf("%d incomplete task nodes, beginning with %s", incompleteCount, firstIncompleteNode.Tasks[0]))
541+
if err := ctx.Err(); err != nil {
542+
errs = append(errs, err)
543+
}
595544
}
545+
596546
klog.V(4).Infof("Result of work: %v", errs)
597547
if len(errs) > 0 {
598548
return errs
599549
}
600-
// if the context was cancelled, we may have unfinished work
601-
if err := ctx.Err(); err != nil {
602-
return []error{err}
603-
}
604550
return nil
605551
}

pkg/payload/task_graph_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,21 @@ func TestRunGraph(t *testing.T) {
704704
tasks := func(names ...string) []*Task {
705705
var arr []*Task
706706
for _, name := range names {
707-
arr = append(arr, &Task{Manifest: &lib.Manifest{OriginalFilename: name}})
707+
manifest := &lib.Manifest{OriginalFilename: name}
708+
err := manifest.UnmarshalJSON([]byte(fmt.Sprintf(`
709+
{
710+
"apiVersion": "v1",
711+
"kind": "ConfigMap",
712+
"metadata": {
713+
"name": "%s",
714+
"namespace": "default"
715+
}
716+
}
717+
`, name)))
718+
if err != nil {
719+
t.Fatalf("load %s: %v", name, err)
720+
}
721+
arr = append(arr, &Task{Manifest: manifest})
708722
}
709723
return arr
710724
}
@@ -862,7 +876,7 @@ func TestRunGraph(t *testing.T) {
862876
return nil
863877
},
864878
want: []string{"a"},
865-
wantErrs: []string{"context canceled"},
879+
wantErrs: []string{`1 incomplete task nodes, beginning with configmap "default/b" (0 of 0)`, "context canceled"},
866880
},
867881
}
868882
for _, tt := range tests {

0 commit comments

Comments
 (0)