Skip to content

Commit cecc63f

Browse files
Merge pull request #1099 from petr-muller/ocpbugs-22442-test-run-graph-mid-task-cancellation-flake-cleanups
NO-JIRA: task graph: comments and cleanups
2 parents adf0cf5 + 95d0010 commit cecc63f

File tree

2 files changed

+51
-36
lines changed

2 files changed

+51
-36
lines changed

pkg/payload/task_graph.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,18 @@ func FlattenByNumberAndComponent(tasks []*Task) [][]*TaskNode {
128128
return [][]*TaskNode{groups}
129129
}
130130

131+
// TaskNode represents a node in a TaskGraph. The node assumes the graph is indexable and nodes are retrievable by index,
132+
// and In/Out are indices of other nodes connected to this one by incoming/outgoing edges, respectively.
131133
type TaskNode struct {
132-
In []int
134+
// In is a list of node indices from which there is an edge to this node (=prerequisites)
135+
In []int
136+
// Tasks to be executed when this node is visited
133137
Tasks []*Task
134-
Out []int
138+
// Out is a list of node indices to which there is an edge from this node (=dependents).
139+
Out []int
135140
}
136141

137-
func (n TaskNode) String() string {
142+
func (n *TaskNode) String() string {
138143
var arr []string
139144
for _, t := range n.Tasks {
140145
if len(t.Manifest.OriginalFilename) > 0 {
@@ -262,7 +267,7 @@ func (g *TaskGraph) Split(onFn func(task *Task) bool) {
262267
}
263268

264269
// BreakFunc returns the input tasks in order of dependencies with
265-
// explicit parallelizm allowed per task in an array of task nodes.
270+
// explicit parallelism allowed per task in an array of task nodes.
266271
type BreakFunc func([]*Task) [][]*TaskNode
267272

268273
// ShiftOrder rotates each TaskNode by step*len/stride when stride > len,
@@ -424,8 +429,8 @@ type taskStatus struct {
424429
}
425430

426431
// RunGraph executes the provided graph in order and in parallel up to maxParallelism. It will not start
427-
// a new TaskNode until all of the prerequisites have completed. If fn returns an error, no dependencies
428-
// of that node will be executed, but other indepedent edges will continue executing.
432+
// a new TaskNode until all its prerequisites have completed. If fn returns an error, no dependencies
433+
// of that node will be executed, but other independent edges will continue executing.
429434
func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func(ctx context.Context, tasks []*Task) error) []error {
430435
submitted := make([]bool, len(graph.Nodes))
431436
results := make([]*taskStatus, len(graph.Nodes))
@@ -452,7 +457,7 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
452457
return -1
453458
}
454459

455-
// Tasks go out to the workers via workCh, and results come brack
460+
// Tasks go out to the workers via workCh, and results come back
456461
// from the workers via resultCh.
457462
workCh := make(chan runTasks, maxParallelism)
458463
defer close(workCh)
@@ -469,16 +474,16 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
469474
}
470475
for i := 0; i < maxParallelism; i++ {
471476
wg.Add(1)
472-
go func(ctx context.Context, job int) {
477+
go func(ctx context.Context, worker int) {
473478
defer utilruntime.HandleCrash()
474479
defer wg.Done()
475480
for {
476481
select {
477482
case <-ctx.Done():
478-
klog.V(2).Infof("Canceled worker %d while waiting for work", job)
483+
klog.V(2).Infof("Worker %d: Received cancel signal while waiting for work", worker)
479484
return
480485
case runTask := <-workCh:
481-
klog.V(2).Infof("Running %d on worker %d", runTask.index, job)
486+
klog.V(2).Infof("Worker %d: Running job %d (with %d tasks)", worker, runTask.index, len(runTask.tasks))
482487
err := fn(ctx, runTask.tasks)
483488
resultCh <- taskStatus{index: runTask.index, error: err}
484489
}
@@ -487,9 +492,10 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
487492
}
488493

489494
var inflight int
490-
nextNode := getNextNode()
491-
done := false
495+
var done bool
496+
492497
for !done {
498+
nextNode := getNextNode()
493499
switch {
494500
case ctx.Err() == nil && nextNode >= 0: // push a task or collect a result
495501
select {
@@ -517,18 +523,16 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
517523
default: // no work to push and nothing in flight. We're done
518524
done = true
519525
}
520-
if !done {
521-
nextNode = getNextNode()
522-
}
523526
}
524527

525528
cancelFn()
526529
wg.Wait()
527-
klog.V(2).Infof("Workers finished")
530+
klog.V(2).Info("Workers finished")
528531

529532
var errs []error
530533
var firstIncompleteNode *TaskNode
531-
incompleteCount := 0
534+
var incompleteCount int
535+
532536
for i, result := range results {
533537
if result == nil {
534538
if firstIncompleteNode == nil && len(graph.Nodes[i].Tasks) > 0 {
@@ -547,9 +551,6 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
547551
}
548552
}
549553

550-
klog.V(2).Infof("Result of work: %v", errs)
551-
if len(errs) > 0 {
552-
return errs
553-
}
554-
return nil
554+
klog.V(2).Infof("Result of work: errs=%v", errs)
555+
return errs
555556
}

pkg/payload/task_graph_test.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -703,21 +703,11 @@ func TestRunGraph(t *testing.T) {
703703
tasks := func(names ...string) []*Task {
704704
var arr []*Task
705705
for _, name := range names {
706-
manifest := &manifest.Manifest{OriginalFilename: name}
707-
err := manifest.UnmarshalJSON([]byte(fmt.Sprintf(`
708-
{
709-
"apiVersion": "v1",
710-
"kind": "ConfigMap",
711-
"metadata": {
712-
"name": "%s",
713-
"namespace": "default"
714-
}
715-
}
716-
`, name)))
717-
if err != nil {
706+
m := &manifest.Manifest{OriginalFilename: name}
707+
if err := m.UnmarshalJSON(configMapJSON(name)); err != nil {
718708
t.Fatalf("load %s: %v", name, err)
719709
}
720-
arr = append(arr, &Task{Manifest: manifest})
710+
arr = append(arr, &Task{Manifest: m})
721711
}
722712
return arr
723713
}
@@ -743,6 +733,8 @@ func TestRunGraph(t *testing.T) {
743733
{
744734
name: "nodes executed after dependencies",
745735
nodes: []*TaskNode{
736+
// In: prerequisites (by index)
737+
// Out: dependents (by index)
746738
{Tasks: tasks("c"), In: []int{3}},
747739
{Tasks: tasks("d", "e"), In: []int{3}},
748740
{Tasks: tasks("f"), In: []int{3}, Out: []int{4}},
@@ -773,6 +765,8 @@ func TestRunGraph(t *testing.T) {
773765
{
774766
name: "task error interrupts node processing",
775767
nodes: []*TaskNode{
768+
// In: prerequisites (by index)
769+
// Out: dependents (by index)
776770
{Tasks: tasks("c"), In: []int{2}},
777771
{Tasks: tasks("d"), In: []int{2}, Out: []int{3}},
778772
{Tasks: tasks("a", "b"), Out: []int{0, 1}},
@@ -799,6 +793,8 @@ func TestRunGraph(t *testing.T) {
799793
{
800794
name: "mid-task cancellation error interrupts node processing",
801795
nodes: []*TaskNode{
796+
// In: prerequisites (by index)
797+
// Out: dependents (by index)
802798
{Tasks: tasks("c"), In: []int{2}},
803799
{Tasks: tasks("d"), In: []int{2}, Out: []int{3}},
804800
{Tasks: tasks("a", "b"), Out: []int{0, 1}},
@@ -853,6 +849,8 @@ func TestRunGraph(t *testing.T) {
853849
{
854850
name: "task errors in parallel nodes both reported",
855851
nodes: []*TaskNode{
852+
// In: prerequisites (by index)
853+
// Out: dependents (by index)
856854
{Tasks: tasks("a"), Out: []int{1}},
857855
{Tasks: tasks("b"), In: []int{0}, Out: []int{2, 4, 8}},
858856
{Tasks: tasks("c1"), In: []int{1}, Out: []int{3}},
@@ -878,8 +876,10 @@ func TestRunGraph(t *testing.T) {
878876
wantErrs: []string{"error - c1", "error - f"},
879877
},
880878
{
881-
name: "cancelation without task errors is reported",
879+
name: "cancellation without task errors is reported",
882880
nodes: []*TaskNode{
881+
// In: prerequisites (by index)
882+
// Out: dependents (by index)
883883
{Tasks: tasks("a"), Out: []int{1}},
884884
{Tasks: tasks("b"), In: []int{0}},
885885
},
@@ -950,3 +950,17 @@ func TestRunGraph(t *testing.T) {
950950
})
951951
}
952952
}
953+
954+
func configMapJSON(name string) []byte {
955+
const cmTemplate = `
956+
{
957+
"apiVersion": "v1",
958+
"kind": "ConfigMap",
959+
"metadata": {
960+
"name": "%s",
961+
"namespace": "default"
962+
}
963+
}
964+
`
965+
return []byte(fmt.Sprintf(cmTemplate, name))
966+
}

0 commit comments

Comments
 (0)