Skip to content

Commit fc25a6f

Browse files
Merge pull request #264 from wking/run-graph-goroutine-refactor
pkg/payload/task_graph: Handle node pushing and result collection without a goroutine
2 parents 68904d1 + 55ef3d3 commit fc25a6f

File tree

2 files changed

+101
-141
lines changed

2 files changed

+101
-141
lines changed

pkg/payload/task_graph.go

Lines changed: 85 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ import (
55
"fmt"
66
"math/rand"
77
"regexp"
8-
"sort"
98
"strings"
109
"sync"
1110

12-
"k8s.io/klog"
13-
1411
"k8s.io/apimachinery/pkg/runtime/schema"
1512
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
"k8s.io/klog"
1614
)
1715

1816
// SplitOnJobs enforces the rule that any Job in the payload prevents reordering or parallelism (either before or after)
@@ -426,180 +424,128 @@ type runTasks struct {
426424
}
427425

428426
type taskStatus struct {
429-
index int
430-
success bool
427+
index int
428+
error error
431429
}
432430

433431
// RunGraph executes the provided graph in order and in parallel up to maxParallelism. It will not start
434432
// a new TaskNode until all of the prerequisites have completed. If fn returns an error, no dependencies
435433
// of that node will be executed, but other indepedent edges will continue executing.
436434
func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) []error {
437-
nestedCtx, cancelFn := context.WithCancel(ctx)
438-
defer cancelFn()
439-
440-
// This goroutine takes nodes from the graph as they are available (their prereq has completed) and
441-
// sends them to workCh. It uses completeCh to know that a previously dispatched item is complete.
442-
completeCh := make(chan taskStatus, maxParallelism)
443-
defer close(completeCh)
435+
submitted := make([]bool, len(graph.Nodes))
436+
results := make([]*taskStatus, len(graph.Nodes))
444437

445-
workCh := make(chan runTasks, maxParallelism)
446-
go func() {
447-
defer close(workCh)
448-
449-
// visited tracks nodes we have not sent (0), are currently
450-
// waiting for completion (1), or have completed (2,3)
451-
const (
452-
nodeNotVisited int = iota
453-
nodeWorking
454-
nodeFailed
455-
nodeComplete
456-
)
457-
visited := make([]int, len(graph.Nodes))
458-
canVisit := func(node *TaskNode) bool {
459-
for _, previous := range node.In {
460-
switch visited[previous] {
461-
case nodeFailed, nodeWorking, nodeNotVisited:
462-
return false
463-
}
438+
canVisit := func(node *TaskNode) bool {
439+
for _, previous := range node.In {
440+
if result := results[previous]; result == nil || result.error != nil {
441+
return false
464442
}
465-
return true
466443
}
444+
return true
445+
}
467446

468-
remaining := len(graph.Nodes)
469-
var inflight int
470-
for {
471-
found := 0
472-
473-
// walk the graph, filling the work queue
474-
for i := 0; i < len(visited); i++ {
475-
if visited[i] != nodeNotVisited {
476-
continue
477-
}
478-
if canVisit(graph.Nodes[i]) {
479-
select {
480-
case workCh <- runTasks{index: i, tasks: graph.Nodes[i].Tasks}:
481-
visited[i] = nodeWorking
482-
found++
483-
inflight++
484-
default:
485-
break
486-
}
487-
}
488-
}
489-
490-
// try to empty the done channel
491-
for len(completeCh) > 0 {
492-
finished := <-completeCh
493-
if finished.success {
494-
visited[finished.index] = nodeComplete
495-
} else {
496-
visited[finished.index] = nodeFailed
497-
}
498-
remaining--
499-
inflight--
500-
found++
501-
}
502-
503-
if found > 0 {
447+
getNextNode := func() int {
448+
for i, node := range graph.Nodes {
449+
if submitted[i] {
504450
continue
505451
}
506-
507-
// no more work to hand out
508-
if remaining == 0 {
509-
klog.V(4).Infof("Graph is complete")
510-
return
452+
if canVisit(node) {
453+
return i
511454
}
455+
}
512456

513-
// we walked the entire graph, there are still nodes remaining, but we're not waiting
514-
// for anything
515-
if inflight == 0 && found == 0 {
516-
klog.V(4).Infof("No more reachable nodes in graph, continue")
517-
break
518-
}
457+
return -1
458+
}
519459

520-
// we did nothing this round, so we have to wait for more
521-
finished, ok := <-completeCh
522-
if !ok {
523-
// we've been aborted
524-
klog.V(4).Infof("Stopped graph walker due to cancel")
525-
return
526-
}
527-
if finished.success {
528-
visited[finished.index] = nodeComplete
529-
} else {
530-
visited[finished.index] = nodeFailed
531-
}
532-
remaining--
533-
inflight--
534-
}
460+
// Tasks go out to the workers via workCh, and results come brack
461+
// from the workers via resultCh.
462+
workCh := make(chan runTasks, maxParallelism)
463+
defer close(workCh)
535464

536-
// take everything remaining and process in order
537-
var unreachable []*Task
538-
for i := 0; i < len(visited); i++ {
539-
if visited[i] == nodeNotVisited && canVisit(graph.Nodes[i]) {
540-
unreachable = append(unreachable, graph.Nodes[i].Tasks...)
541-
}
542-
}
543-
if len(unreachable) > 0 {
544-
sort.Slice(unreachable, func(i, j int) bool {
545-
a, b := unreachable[i], unreachable[j]
546-
return a.Index < b.Index
547-
})
548-
workCh <- runTasks{index: -1, tasks: unreachable}
549-
klog.V(4).Infof("Waiting for last tasks")
550-
<-completeCh
551-
}
552-
klog.V(4).Infof("No more work")
553-
}()
465+
resultCh := make(chan taskStatus, maxParallelism)
466+
defer close(resultCh)
467+
468+
nestedCtx, cancelFn := context.WithCancel(ctx)
469+
defer cancelFn()
554470

555-
errCh := make(chan error, maxParallelism)
556471
wg := sync.WaitGroup{}
557472
if maxParallelism < 1 {
558473
maxParallelism = 1
559474
}
560475
for i := 0; i < maxParallelism; i++ {
561476
wg.Add(1)
562-
go func(job int) {
477+
go func(ctx context.Context, job int) {
563478
defer utilruntime.HandleCrash()
564479
defer wg.Done()
565480
for {
566481
select {
567-
case <-nestedCtx.Done():
568-
klog.V(4).Infof("Canceled worker %d", job)
482+
case <-ctx.Done():
483+
klog.V(4).Infof("Canceled worker %d while waiting for work", job)
569484
return
570-
case runTask, ok := <-workCh:
571-
if !ok {
572-
klog.V(4).Infof("No more work for %d", job)
573-
return
574-
}
485+
case runTask := <-workCh:
575486
klog.V(4).Infof("Running %d on worker %d", runTask.index, job)
576-
err := fn(nestedCtx, runTask.tasks)
577-
completeCh <- taskStatus{index: runTask.index, success: err == nil}
578-
if err != nil {
579-
errCh <- err
580-
}
487+
err := fn(ctx, runTask.tasks)
488+
resultCh <- taskStatus{index: runTask.index, error: err}
581489
}
582490
}
583-
}(i)
491+
}(nestedCtx, i)
492+
}
493+
494+
var inflight int
495+
nextNode := getNextNode()
496+
done := false
497+
for !done {
498+
switch {
499+
case ctx.Err() == nil && nextNode >= 0: // push a task or collect a result
500+
select {
501+
case workCh <- runTasks{index: nextNode, tasks: graph.Nodes[nextNode].Tasks}:
502+
submitted[nextNode] = true
503+
inflight++
504+
case result := <-resultCh:
505+
results[result.index] = &result
506+
inflight--
507+
case <-ctx.Done():
508+
}
509+
case inflight > 0: // no work available to push; collect results
510+
result := <-resultCh
511+
results[result.index] = &result
512+
inflight--
513+
default: // no work to push and nothing in flight. We're done
514+
done = true
515+
}
516+
if !done {
517+
nextNode = getNextNode()
518+
}
584519
}
585-
go func() {
586-
klog.V(4).Infof("Waiting for workers to complete")
587-
wg.Wait()
588-
klog.V(4).Infof("Workers finished")
589-
close(errCh)
590-
}()
520+
521+
cancelFn()
522+
wg.Wait()
523+
klog.V(4).Infof("Workers finished")
591524

592525
var errs []error
593-
for err := range errCh {
594-
errs = append(errs, err)
526+
var firstIncompleteNode *TaskNode
527+
incompleteCount := 0
528+
for i, result := range results {
529+
if result == nil {
530+
if firstIncompleteNode == nil {
531+
firstIncompleteNode = graph.Nodes[i]
532+
}
533+
incompleteCount++
534+
} else if result.error != nil {
535+
errs = append(errs, result.error)
536+
}
537+
}
538+
539+
if len(errs) == 0 && firstIncompleteNode != nil {
540+
errs = append(errs, fmt.Errorf("%d incomplete task nodes, beginning with %s", incompleteCount, firstIncompleteNode.Tasks[0]))
541+
if err := ctx.Err(); err != nil {
542+
errs = append(errs, err)
543+
}
595544
}
545+
596546
klog.V(4).Infof("Result of work: %v", errs)
597547
if len(errs) > 0 {
598548
return errs
599549
}
600-
// if the context was cancelled, we may have unfinished work
601-
if err := ctx.Err(); err != nil {
602-
return []error{err}
603-
}
604550
return nil
605551
}

pkg/payload/task_graph_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,21 @@ func TestRunGraph(t *testing.T) {
704704
tasks := func(names ...string) []*Task {
705705
var arr []*Task
706706
for _, name := range names {
707-
arr = append(arr, &Task{Manifest: &lib.Manifest{OriginalFilename: name}})
707+
manifest := &lib.Manifest{OriginalFilename: name}
708+
err := manifest.UnmarshalJSON([]byte(fmt.Sprintf(`
709+
{
710+
"apiVersion": "v1",
711+
"kind": "ConfigMap",
712+
"metadata": {
713+
"name": "%s",
714+
"namespace": "default"
715+
}
716+
}
717+
`, name)))
718+
if err != nil {
719+
t.Fatalf("load %s: %v", name, err)
720+
}
721+
arr = append(arr, &Task{Manifest: manifest})
708722
}
709723
return arr
710724
}
@@ -862,7 +876,7 @@ func TestRunGraph(t *testing.T) {
862876
return nil
863877
},
864878
want: []string{"a"},
865-
wantErrs: []string{"context canceled"},
879+
wantErrs: []string{`1 incomplete task nodes, beginning with configmap "default/b" (0 of 0)`, "context canceled"},
866880
},
867881
}
868882
for _, tt := range tests {

0 commit comments

Comments
 (0)