Skip to content

Commit 206ecb4

Browse files
authored
Merge pull request kubernetes#71326 from shomron/issue-71277-polluntil-leak
Fix goroutine leak in pkg/util/wait PollUntil()
2 parents 3dfe907 + 0869e63 commit 206ecb4

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,25 @@ func (b *Backoff) Step() time.Duration {
250250
return duration
251251
}
252252

253+
// contextForChannel derives a child context from a parent channel.
254+
//
255+
// The derived context's Done channel is closed when the returned cancel function
256+
// is called or when the parent channel is closed, whichever happens first.
257+
//
258+
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
259+
func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
260+
ctx, cancel := context.WithCancel(context.Background())
261+
262+
go func() {
263+
select {
264+
case <-parentCh:
265+
cancel()
266+
case <-ctx.Done():
267+
}
268+
}()
269+
return ctx, cancel
270+
}
271+
253272
// ExponentialBackoff repeats a condition check with exponential backoff.
254273
//
255274
// It checks the condition up to Steps times, increasing the wait by multiplying
@@ -353,7 +372,9 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro
353372
// PollUntil always waits interval before the first run of 'condition'.
354373
// 'condition' will always be invoked at least once.
355374
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
356-
return WaitFor(poller(interval, 0), condition, stopCh)
375+
ctx, cancel := contextForChannel(stopCh)
376+
defer cancel()
377+
return WaitFor(poller(interval, 0), condition, ctx.Done())
357378
}
358379

359380
// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
@@ -422,7 +443,9 @@ func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
422443
// timeout has elapsed and then closes the channel.
423444
//
424445
// Over very short intervals you may receive no ticks before the channel is
425-
// closed. A timeout of 0 is interpreted as an infinity.
446+
// closed. A timeout of 0 is interpreted as an infinity, and in such a case
447+
// it would be the caller's responsibility to close the done channel.
448+
// Failure to do so would result in a leaked goroutine.
426449
//
427450
// Output ticks are not buffered. If the channel is not ready to receive an
428451
// item, the tick is skipped.

staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,3 +664,33 @@ func TestBackoff_Step(t *testing.T) {
664664
}
665665
}
666666
}
667+
668+
func TestContextForChannel(t *testing.T) {
669+
var wg sync.WaitGroup
670+
parentCh := make(chan struct{})
671+
done := make(chan struct{})
672+
673+
for i := 0; i < 3; i++ {
674+
wg.Add(1)
675+
go func() {
676+
defer wg.Done()
677+
ctx, cancel := contextForChannel(parentCh)
678+
defer cancel()
679+
<-ctx.Done()
680+
}()
681+
}
682+
683+
go func() {
684+
wg.Wait()
685+
close(done)
686+
}()
687+
688+
// Closing parent channel should cancel all children contexts
689+
close(parentCh)
690+
691+
select {
692+
case <-done:
693+
case <-time.After(ForeverTestTimeout):
694+
t.Errorf("unexepcted timeout waiting for parent to cancel child contexts")
695+
}
696+
}

0 commit comments

Comments
 (0)