Skip to content

Commit 81a1f12

Browse files
authored
Merge pull request kubernetes#70277 from kdada/master
Fix goroutine leak of wait.poller
2 parents 68451f3 + 2306eb4 commit 81a1f12

File tree

2 files changed

+60
-20
lines changed

2 files changed

+60
-20
lines changed

staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -359,18 +359,30 @@ type WaitFunc func(done <-chan struct{}) <-chan struct{}
359359
// ErrWaitTimeout will be returned if the channel is closed without fn ever
360360
// returning true.
361361
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
362-
c := wait(done)
362+
stopCh := make(chan struct{})
363+
once := sync.Once{}
364+
closeCh := func() {
365+
once.Do(func() {
366+
close(stopCh)
367+
})
368+
}
369+
defer closeCh()
370+
c := wait(stopCh)
363371
for {
364-
_, open := <-c
365-
ok, err := fn()
366-
if err != nil {
367-
return err
368-
}
369-
if ok {
370-
return nil
371-
}
372-
if !open {
373-
break
372+
select {
373+
case _, open := <-c:
374+
ok, err := fn()
375+
if err != nil {
376+
return err
377+
}
378+
if ok {
379+
return nil
380+
}
381+
if !open {
382+
return ErrWaitTimeout
383+
}
384+
case <-done:
385+
closeCh()
374386
}
375387
}
376388
return ErrWaitTimeout

staging/src/k8s.io/apimachinery/pkg/util/wait/wait_test.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -456,18 +456,46 @@ func TestWaitFor(t *testing.T) {
456456
}
457457
}
458458

459-
func TestWaitForWithDelay(t *testing.T) {
460-
done := make(chan struct{})
461-
defer close(done)
462-
WaitFor(poller(time.Millisecond, ForeverTestTimeout), func() (bool, error) {
459+
func TestWaitForWithClosedChannel(t *testing.T) {
460+
stopCh := make(chan struct{})
461+
close(stopCh)
462+
start := time.Now()
463+
err := WaitFor(poller(ForeverTestTimeout, ForeverTestTimeout), func() (bool, error) {
464+
return false, nil
465+
}, stopCh)
466+
duration := time.Now().Sub(start)
467+
// The WaitFor should return immediately, so the duration is close to 0s.
468+
if duration >= ForeverTestTimeout/2 {
469+
t.Errorf("expected short timeout duration")
470+
}
471+
// The interval of the poller is ForeverTestTimeout, so the WaitFor should always return ErrWaitTimeout.
472+
if err != ErrWaitTimeout {
473+
t.Errorf("expected ErrWaitTimeout from WaitFunc")
474+
}
475+
}
476+
477+
// TestWaitForClosesStopCh verifies that after the condition func returns true, WaitFor() closes the stop channel it supplies to the WaitFunc.
478+
func TestWaitForClosesStopCh(t *testing.T) {
479+
stopCh := make(chan struct{})
480+
defer close(stopCh)
481+
waitFunc := poller(time.Millisecond, ForeverTestTimeout)
482+
var doneCh <-chan struct{}
483+
484+
WaitFor(func(done <-chan struct{}) <-chan struct{} {
485+
doneCh = done
486+
return waitFunc(done)
487+
}, func() (bool, error) {
463488
time.Sleep(10 * time.Millisecond)
464489
return true, nil
465-
}, done)
466-
// If polling goroutine doesn't see the done signal it will leak timers.
490+
}, stopCh)
491+
// The polling goroutine should be closed after WaitFor returning.
467492
select {
468-
case done <- struct{}{}:
469-
case <-time.After(ForeverTestTimeout):
470-
t.Errorf("expected an ack of the done signal.")
493+
case _, ok := <-doneCh:
494+
if ok {
495+
t.Errorf("expected closed channel after WaitFunc returning")
496+
}
497+
default:
498+
t.Errorf("expected an ack of the done signal")
471499
}
472500
}
473501

0 commit comments

Comments
 (0)