Skip to content

Commit 95d0010

Browse files
committed
task graph: minor code cleanups
1 parent e20e03f commit 95d0010

File tree

2 files changed

+29
-29
lines changed

2 files changed

+29
-29
lines changed

pkg/payload/task_graph.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ type TaskNode struct {
139139
Out []int
140140
}
141141

142-
func (n TaskNode) String() string {
142+
func (n *TaskNode) String() string {
143143
var arr []string
144144
for _, t := range n.Tasks {
145145
if len(t.Manifest.OriginalFilename) > 0 {
@@ -474,16 +474,16 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
474474
}
475475
for i := 0; i < maxParallelism; i++ {
476476
wg.Add(1)
477-
go func(ctx context.Context, job int) {
477+
go func(ctx context.Context, worker int) {
478478
defer utilruntime.HandleCrash()
479479
defer wg.Done()
480480
for {
481481
select {
482482
case <-ctx.Done():
483-
klog.V(2).Infof("Canceled worker %d while waiting for work", job)
483+
klog.V(2).Infof("Worker %d: Received cancel signal while waiting for work", worker)
484484
return
485485
case runTask := <-workCh:
486-
klog.V(2).Infof("Running %d on worker %d", runTask.index, job)
486+
klog.V(2).Infof("Worker %d: Running job %d (with %d tasks)", worker, runTask.index, len(runTask.tasks))
487487
err := fn(ctx, runTask.tasks)
488488
resultCh <- taskStatus{index: runTask.index, error: err}
489489
}
@@ -492,9 +492,10 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
492492
}
493493

494494
var inflight int
495-
nextNode := getNextNode()
496-
done := false
495+
var done bool
496+
497497
for !done {
498+
nextNode := getNextNode()
498499
switch {
499500
case ctx.Err() == nil && nextNode >= 0: // push a task or collect a result
500501
select {
@@ -522,18 +523,16 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
522523
default: // no work to push and nothing in flight. We're done
523524
done = true
524525
}
525-
if !done {
526-
nextNode = getNextNode()
527-
}
528526
}
529527

530528
cancelFn()
531529
wg.Wait()
532-
klog.V(2).Infof("Workers finished")
530+
klog.V(2).Info("Workers finished")
533531

534532
var errs []error
535533
var firstIncompleteNode *TaskNode
536-
incompleteCount := 0
534+
var incompleteCount int
535+
537536
for i, result := range results {
538537
if result == nil {
539538
if firstIncompleteNode == nil && len(graph.Nodes[i].Tasks) > 0 {
@@ -552,9 +551,6 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
552551
}
553552
}
554553

555-
klog.V(2).Infof("Result of work: %v", errs)
556-
if len(errs) > 0 {
557-
return errs
558-
}
559-
return nil
554+
klog.V(2).Infof("Result of work: errs=%v", errs)
555+
return errs
560556
}

pkg/payload/task_graph_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -703,21 +703,11 @@ func TestRunGraph(t *testing.T) {
703703
tasks := func(names ...string) []*Task {
704704
var arr []*Task
705705
for _, name := range names {
706-
manifest := &manifest.Manifest{OriginalFilename: name}
707-
err := manifest.UnmarshalJSON([]byte(fmt.Sprintf(`
708-
{
709-
"apiVersion": "v1",
710-
"kind": "ConfigMap",
711-
"metadata": {
712-
"name": "%s",
713-
"namespace": "default"
714-
}
715-
}
716-
`, name)))
717-
if err != nil {
706+
m := &manifest.Manifest{OriginalFilename: name}
707+
if err := m.UnmarshalJSON(configMapJSON(name)); err != nil {
718708
t.Fatalf("load %s: %v", name, err)
719709
}
720-
arr = append(arr, &Task{Manifest: manifest})
710+
arr = append(arr, &Task{Manifest: m})
721711
}
722712
return arr
723713
}
@@ -960,3 +950,17 @@ func TestRunGraph(t *testing.T) {
960950
})
961951
}
962952
}
953+
954+
func configMapJSON(name string) []byte {
955+
const cmTemplate = `
956+
{
957+
"apiVersion": "v1",
958+
"kind": "ConfigMap",
959+
"metadata": {
960+
"name": "%s",
961+
"namespace": "default"
962+
}
963+
}
964+
`
965+
return []byte(fmt.Sprintf(cmTemplate, name))
966+
}

0 commit comments

Comments
 (0)