Skip to content

Commit 389e60f

Browse files
authored
Merge pull request kubernetes#120980 from p0lyn0mial/upstream-client-go-close-watcher
reflector: close an established watcher when the StopCh was closed
2 parents 6a84edb + 26f113b commit 389e60f

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,11 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
397397
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
398398
select {
399399
case <-stopCh:
400+
// we can only end up here when the stopCh
401+
// was closed after a successful watchlist or list request
402+
if w != nil {
403+
w.Stop()
404+
}
400405
return nil
401406
default:
402407
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"testing"
2929
"time"
3030

31+
"github.com/stretchr/testify/require"
32+
3133
v1 "k8s.io/api/core/v1"
3234
apierrors "k8s.io/apimachinery/pkg/api/errors"
3335
"k8s.io/apimachinery/pkg/api/meta"
@@ -124,6 +126,26 @@ func TestReflectorResyncChan(t *testing.T) {
124126
}
125127
}
126128

129+
// TestEstablishedWatchStoppedAfterStopCh ensures that
130+
// an established watch will be closed right after
131+
// the StopCh was also closed.
132+
func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) {
133+
ctx, ctxCancel := context.WithCancel(context.TODO())
134+
ctxCancel()
135+
w := watch.NewFake()
136+
require.False(t, w.IsStopped())
137+
138+
// w is stopped when the stopCh is closed
139+
target := NewReflector(nil, &v1.Pod{}, nil, 0)
140+
err := target.watch(w, ctx.Done(), nil)
141+
require.NoError(t, err)
142+
require.True(t, w.IsStopped())
143+
144+
// noop when the w is nil and the ctx is closed
145+
err = target.watch(nil, ctx.Done(), nil)
146+
require.NoError(t, err)
147+
}
148+
127149
func BenchmarkReflectorResyncChanMany(b *testing.B) {
128150
s := NewStore(MetaNamespaceKeyFunc)
129151
g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond)

0 commit comments

Comments
 (0)