Skip to content

Commit 70db253

Browse files
committed
OCPBUGS-22442: Fix TestRunGraph/mid-task_cancellation_with_work_in_queue_does_not_deadlock flake
Occasionally, the test flaked with the following: ``` --- FAIL: TestRunGraph (1.04s) --- FAIL: TestRunGraph/mid-task_cancellation_with_work_in_queue_does_not_deadlock (0.01s) task_graph_test.go:943: unexpected error: [context canceled context canceled] ``` The failure happened because the test saw two `context canceled` errors, but expected only one such error to happen: ``` errors: []string{"context canceled"} ``` The test is processing a graph with two independenent nodes, processed without paralellism: ``` nodes: []*TaskNode{ {Tasks: tasks("a1", "a2", "a3")}, {Tasks: tasks("b")}, }, maxParallelism: 1, ``` The test is configured to signal the cancellation in the middle of processing task `a2`, and expects: - a2 task to return successfuly - a3 task to start, but return the `context canceled` error - b task to never start The problem is the non-determinism of the Go `select` statement. When multiple `case` branches are satisfied, one of them is chosen nondeterministically. The producer/consumer structure in the graph processing looks like the following: ```go // consumers for i := 0; i < maxParallelism; i++ { go func(ctx context.Context, worker int) { for { select { case <-ctx.Done(): return case runTask := <-workCh: err := fn(ctx, runTask.tasks) resultCh <- taskStatus{index: runTask.index, error: err} } } }(ctx, i) } // producer for !done { nextNode := getNextNode() switch { case ctx.Err() == nil && nextNode >= 0: // push a task or collect a result select { case workCh <- runTasks{index: nextNode, tasks: graph.Nodes[nextNode].Tasks}: submitted[nextNode] = true inflight++ case result := <-resultCh: results[result.index] = &result inflight-- case <-ctx.Done(): } case inflight > 0: // no work available to push; collect results select { case result := <-resultCh: results[result.index] = &result inflight-- case <-ctx.Done(): select { case runTask := <-workCh: // workers canceled, so remove any work from the queue ourselves inflight-- submitted[runTask.index] = false // TODO: This does not seem needed default: } } default: // no work to push and nothing in flight. We're done done = true } } ``` Because of the nondeterminism the following trace was possible: 1. producer creates a job for first node [a1, a2, a3] and puts it to `workCh` 2. producer creates a job for second node [b] and waits (`workCh` is full, `resultCh` is empty, ctx is not canceled) 3. worker consumes the first job and while a2 is processed ctx gets canceled 4. producer puts the job for the second node to `workCh` 5. worker completes processing the first job, returning error on trying to process task a3 6. worker starts another loop, hits `select` and can select both branches: `ctx` is canceled and there is an item in `workCh` 7. worker consumes the second job, processes task b, returns second error 8. both errors are consumed by the producer, test expected just one error -> test fail There are two possible paths that prevented us from hitting the problem: 1. either the consumer selected the `<-ctx.Done()` branch in (6) above: `b` is never processed and `workCh` is drained by the producer 2. or the producer managed to drain `workCh` before the worker got to it Note that there is a similar non-determinism on the producer side. When the context is canceled, the collection half of the cycle (`case inflight > 0`) can select between the two branches while there are results in `resultCh`, which means it can take the "draining" branch even when there is nothing to drain, even multiple times, until it is lucky enough times to collect all results in `resultCh`. The fix for the flake is to detect the cancelation at the start of the loop. This narrows the race window to a really short time between the check and the `select`.
1 parent 90da0da commit 70db253

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

pkg/payload/task_graph.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,17 @@ func RunGraph(ctx context.Context, graph *TaskGraph, maxParallelism int, fn func
478478
defer utilruntime.HandleCrash()
479479
defer wg.Done()
480480
for {
481+
// First, make sure the worker was not signalled to cancel. This may seem redundant with the <-ctx.Done() below,
482+
// but it is necessary to properly handle the case where cancellation occurs while the worker is processing a
483+
// task. Go `select` is nondeterministic when multiple cases are ready, so when the worker finishes a task,
484+
// starts another loop and both the ctx.Done() and workCh cases are ready, Go could choose either of them,
485+
// potentially starting a new task even though the worker was supposed to stop. Checking cancellation here makes
486+
// the race window much smaller (cancellation would need to happen between this check and the select).
487+
if ctx.Err() != nil {
488+
klog.V(2).Infof("Worker %d: Received cancel signal", worker)
489+
return
490+
}
491+
481492
select {
482493
case <-ctx.Done():
483494
klog.V(2).Infof("Worker %d: Received cancel signal while waiting for work", worker)

pkg/payload/task_graph_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,6 @@ func TestRunGraph(t *testing.T) {
898898
return err
899899
}
900900
cancelFn()
901-
// time.Sleep(time.Second)
902901
return nil
903902
},
904903
"*": func(t *testing.T, name string, ctx context.Context, cancelFn func()) error {

0 commit comments

Comments
 (0)