Skip to content

Commit 5bf1e95

Browse files
authored
Merge pull request kubernetes#125299 from karlkfi/karl-reflector-fix-2
Improve Reflector unit tests
2 parents 9f13e3a + ab5aa47 commit 5bf1e95

File tree

2 files changed

+248
-40
lines changed

2 files changed

+248
-40
lines changed

staging/src/k8s.io/apimachinery/pkg/watch/watch.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,3 +322,21 @@ func (pw *ProxyWatcher) ResultChan() <-chan Event {
322322
func (pw *ProxyWatcher) StopChan() <-chan struct{} {
323323
return pw.stopCh
324324
}
325+
326+
// MockWatcher implements watch.Interface with mockable functions.
327+
type MockWatcher struct {
328+
StopFunc func()
329+
ResultChanFunc func() <-chan Event
330+
}
331+
332+
var _ Interface = &MockWatcher{}
333+
334+
// Stop calls StopFunc
335+
func (mw MockWatcher) Stop() {
336+
mw.StopFunc()
337+
}
338+
339+
// ResultChan calls ResultChanFunc
340+
func (mw MockWatcher) ResultChan() <-chan Event {
341+
return mw.ResultChanFunc()
342+
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 230 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"testing"
2929
"time"
3030

31+
"github.com/stretchr/testify/assert"
3132
"github.com/stretchr/testify/require"
3233

3334
v1 "k8s.io/api/core/v1"
@@ -97,19 +98,35 @@ func TestRunUntil(t *testing.T) {
9798
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
9899
},
99100
}
100-
go r.Run(stopCh)
101+
doneCh := make(chan struct{})
102+
go func() {
103+
defer close(doneCh)
104+
r.Run(stopCh)
105+
}()
101106
// Synchronously add a dummy pod into the watch channel so we
102107
// know the RunUntil go routine is in the watch handler.
103108
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
109+
104110
close(stopCh)
105-
select {
106-
case _, ok := <-fw.ResultChan():
107-
if ok {
108-
t.Errorf("Watch channel left open after stopping the watch")
111+
resultCh := fw.ResultChan()
112+
for {
113+
select {
114+
case <-doneCh:
115+
if resultCh == nil {
116+
return // both closed
117+
}
118+
doneCh = nil
119+
case _, ok := <-resultCh:
120+
if ok {
121+
t.Fatalf("Watch channel left open after stopping the watch")
122+
}
123+
if doneCh == nil {
124+
return // both closed
125+
}
126+
resultCh = nil
127+
case <-time.After(wait.ForeverTestTimeout):
128+
t.Fatalf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
109129
}
110-
case <-time.After(wait.ForeverTestTimeout):
111-
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
112-
break
113130
}
114131
}
115132

@@ -126,24 +143,59 @@ func TestReflectorResyncChan(t *testing.T) {
126143
}
127144
}
128145

129-
// TestEstablishedWatchStoppedAfterStopCh ensures that
130-
// an established watch will be closed right after
131-
// the StopCh was also closed.
132-
func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) {
133-
ctx, ctxCancel := context.WithCancel(context.TODO())
134-
ctxCancel()
135-
w := watch.NewFake()
136-
require.False(t, w.IsStopped())
137-
138-
// w is stopped when the stopCh is closed
139-
target := NewReflector(nil, &v1.Pod{}, nil, 0)
140-
err := target.watch(w, ctx.Done(), nil)
146+
// TestReflectorWatchStoppedBefore ensures that neither List nor Watch are
147+
// called if the stop channel is closed before Reflector.watch is called.
148+
func TestReflectorWatchStoppedBefore(t *testing.T) {
149+
stopCh := make(chan struct{})
150+
close(stopCh)
151+
152+
lw := &ListWatch{
153+
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
154+
t.Fatal("ListFunc called unexpectedly")
155+
return nil, nil
156+
},
157+
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
158+
// If WatchFunc is never called, the watcher it returns doesn't need to be stopped.
159+
t.Fatal("WatchFunc called unexpectedly")
160+
return nil, nil
161+
},
162+
}
163+
target := NewReflector(lw, &v1.Pod{}, nil, 0)
164+
165+
err := target.watch(nil, stopCh, nil)
141166
require.NoError(t, err)
142-
require.True(t, w.IsStopped())
167+
}
168+
169+
// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if
170+
// the stop channel is closed after Reflector.watch has started watching.
171+
func TestReflectorWatchStoppedAfter(t *testing.T) {
172+
stopCh := make(chan struct{})
173+
174+
var watchers []*watch.FakeWatcher
143175

144-
// noop when the w is nil and the ctx is closed
145-
err = target.watch(nil, ctx.Done(), nil)
176+
lw := &ListWatch{
177+
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
178+
t.Fatal("ListFunc called unexpectedly")
179+
return nil, nil
180+
},
181+
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
182+
// Simulate the stop channel being closed after watching has started
183+
go func() {
184+
time.Sleep(10 * time.Millisecond)
185+
close(stopCh)
186+
}()
187+
// Use a fake watcher that never sends events
188+
w := watch.NewFake()
189+
watchers = append(watchers, w)
190+
return w, nil
191+
},
192+
}
193+
target := NewReflector(lw, &v1.Pod{}, nil, 0)
194+
195+
err := target.watch(nil, stopCh, nil)
146196
require.NoError(t, err)
197+
require.Equal(t, 1, len(watchers))
198+
require.True(t, watchers[0].IsStopped())
147199
}
148200

149201
func BenchmarkReflectorResyncChanMany(b *testing.B) {
@@ -158,22 +210,148 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
158210
}
159211
}
160212

161-
func TestReflectorWatchHandlerError(t *testing.T) {
213+
// TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when
214+
// stopCh is already closed before handleWatch was called. It also ensures that
215+
// ResultChan is only called once and that Stop is called after ResultChan.
216+
func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
162217
s := NewStore(MetaNamespaceKeyFunc)
163218
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
164-
fw := watch.NewFake()
165-
go func() {
166-
fw.Stop()
167-
}()
219+
stopCh := make(chan struct{})
220+
// Simulate the watch channel being closed before the watchHandler is called
221+
close(stopCh)
222+
var calls []string
223+
resultCh := make(chan watch.Event)
224+
fw := watch.MockWatcher{
225+
StopFunc: func() {
226+
calls = append(calls, "Stop")
227+
close(resultCh)
228+
},
229+
ResultChanFunc: func() <-chan watch.Event {
230+
calls = append(calls, "ResultChan")
231+
return resultCh
232+
},
233+
}
234+
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
235+
if err == nil {
236+
t.Errorf("unexpected non-error")
237+
}
238+
// Ensure the watcher methods are called exactly once in this exact order.
239+
// TODO(karlkfi): Fix watchHandler to call Stop()
240+
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
241+
assert.Equal(t, []string{"ResultChan"}, calls)
242+
}
243+
244+
// TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when
245+
// stopCh is closed after handleWatch was called. It also ensures that
246+
// ResultChan is only called once and that Stop is called after ResultChan.
247+
func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
248+
s := NewStore(MetaNamespaceKeyFunc)
249+
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
250+
var calls []string
251+
stopCh := make(chan struct{})
252+
resultCh := make(chan watch.Event)
253+
fw := watch.MockWatcher{
254+
StopFunc: func() {
255+
calls = append(calls, "Stop")
256+
close(resultCh)
257+
},
258+
ResultChanFunc: func() <-chan watch.Event {
259+
calls = append(calls, "ResultChan")
260+
resultCh = make(chan watch.Event)
261+
// Simulate the watch handler being stopped asynchronously by the
262+
// caller, after watching has started.
263+
go func() {
264+
time.Sleep(10 * time.Millisecond)
265+
close(stopCh)
266+
}()
267+
return resultCh
268+
},
269+
}
270+
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
271+
if err == nil {
272+
t.Errorf("unexpected non-error")
273+
}
274+
// Ensure the watcher methods are called exactly once in this exact order.
275+
// TODO(karlkfi): Fix watchHandler to call Stop()
276+
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
277+
assert.Equal(t, []string{"ResultChan"}, calls)
278+
}
279+
280+
// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
281+
// stops when the result channel is closed before handleWatch was called.
282+
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
283+
s := NewStore(MetaNamespaceKeyFunc)
284+
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
285+
var calls []string
286+
resultCh := make(chan watch.Event)
287+
fw := watch.MockWatcher{
288+
StopFunc: func() {
289+
calls = append(calls, "Stop")
290+
},
291+
ResultChanFunc: func() <-chan watch.Event {
292+
calls = append(calls, "ResultChan")
293+
return resultCh
294+
},
295+
}
296+
// Simulate the result channel being closed by the producer before handleWatch is called.
297+
close(resultCh)
168298
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
169299
if err == nil {
170300
t.Errorf("unexpected non-error")
171301
}
302+
// Ensure the watcher methods are called exactly once in this exact order.
303+
// TODO(karlkfi): Fix watchHandler to call Stop()
304+
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
305+
assert.Equal(t, []string{"ResultChan"}, calls)
306+
}
307+
308+
// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
309+
// stops when the result channel is closed after handleWatch has started watching.
310+
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
311+
s := NewStore(MetaNamespaceKeyFunc)
312+
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
313+
var calls []string
314+
resultCh := make(chan watch.Event)
315+
fw := watch.MockWatcher{
316+
StopFunc: func() {
317+
calls = append(calls, "Stop")
318+
},
319+
ResultChanFunc: func() <-chan watch.Event {
320+
calls = append(calls, "ResultChan")
321+
resultCh = make(chan watch.Event)
322+
// Simulate the result channel being closed by the producer, after
323+
// watching has started.
324+
go func() {
325+
time.Sleep(10 * time.Millisecond)
326+
close(resultCh)
327+
}()
328+
return resultCh
329+
},
330+
}
331+
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
332+
if err == nil {
333+
t.Errorf("unexpected non-error")
334+
}
335+
// Ensure the watcher methods are called exactly once in this exact order.
336+
// TODO(karlkfi): Fix watchHandler to call Stop()
337+
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
338+
assert.Equal(t, []string{"ResultChan"}, calls)
172339
}
173340

174341
func TestReflectorWatchHandler(t *testing.T) {
175342
s := NewStore(MetaNamespaceKeyFunc)
176343
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
344+
// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
345+
// watching after all the events have been consumed. This avoids race
346+
// conditions which can happen if the producer calls Stop(), instead of the
347+
// consumer.
348+
stopCh := make(chan struct{})
349+
setLastSyncResourceVersion := func(rv string) {
350+
g.setLastSyncResourceVersion(rv)
351+
if rv == "32" {
352+
close(stopCh)
353+
}
354+
}
177355
fw := watch.NewFake()
178356
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
179357
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
@@ -184,15 +362,16 @@ func TestReflectorWatchHandler(t *testing.T) {
184362
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
185363
fw.Stop()
186364
}()
187-
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
188-
if err != nil {
365+
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
366+
if !errors.Is(err, errorStopRequested) {
189367
t.Errorf("unexpected error %v", err)
190368
}
191369

192370
mkPod := func(id string, rv string) *v1.Pod {
193371
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
194372
}
195373

374+
// Validate that the Store was updated by the events
196375
table := []struct {
197376
Pod *v1.Pod
198377
exists bool
@@ -215,12 +394,7 @@ func TestReflectorWatchHandler(t *testing.T) {
215394
}
216395
}
217396

218-
// RV should send the last version we see.
219-
if e, a := "32", g.LastSyncResourceVersion(); e != a {
220-
t.Errorf("expected %v, got %v", e, a)
221-
}
222-
223-
// last sync resource version should be the last version synced with store
397+
// Validate that setLastSyncResourceVersion was called with the RV from the last event.
224398
if e, a := "32", g.LastSyncResourceVersion(); e != a {
225399
t.Errorf("expected %v, got %v", e, a)
226400
}
@@ -230,8 +404,8 @@ func TestReflectorStopWatch(t *testing.T) {
230404
s := NewStore(MetaNamespaceKeyFunc)
231405
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
232406
fw := watch.NewFake()
233-
stopWatch := make(chan struct{}, 1)
234-
stopWatch <- struct{}{}
407+
stopWatch := make(chan struct{})
408+
close(stopWatch)
235409
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
236410
if err != errorStopRequested {
237411
t.Errorf("expected stop error, got %q", err)
@@ -361,6 +535,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
361535
}
362536
}
363537
watchRet, watchErr := item.events, item.watchErr
538+
stopCh := make(chan struct{})
364539
lw := &testLW{
365540
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
366541
if watchErr != nil {
@@ -372,7 +547,13 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
372547
for _, e := range watchRet {
373548
fw.Action(e.Type, e.Object)
374549
}
375-
fw.Stop()
550+
// Because FakeWatcher doesn't buffer events, it's safe to
551+
// close the stop channel immediately without missing events.
552+
// But usually, the event producer would instead close the
553+
// result channel, and wait for the consumer to stop the
554+
// watcher, to avoid race conditions.
555+
// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
556+
close(stopCh)
376557
}()
377558
return fw, nil
378559
},
@@ -381,7 +562,16 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
381562
},
382563
}
383564
r := NewReflector(lw, &v1.Pod{}, s, 0)
384-
r.ListAndWatch(wait.NeverStop)
565+
err := r.ListAndWatch(stopCh)
566+
if item.listErr != nil && !errors.Is(err, item.listErr) {
567+
t.Errorf("unexpected ListAndWatch error: %v", err)
568+
}
569+
if item.watchErr != nil && !errors.Is(err, item.watchErr) {
570+
t.Errorf("unexpected ListAndWatch error: %v", err)
571+
}
572+
if item.listErr == nil && item.watchErr == nil {
573+
assert.NoError(t, err)
574+
}
385575
}
386576
}
387577

0 commit comments

Comments
 (0)