@@ -28,6 +28,7 @@ import (
28
28
"testing"
29
29
"time"
30
30
31
+ "github.com/stretchr/testify/assert"
31
32
"github.com/stretchr/testify/require"
32
33
33
34
v1 "k8s.io/api/core/v1"
@@ -97,19 +98,35 @@ func TestRunUntil(t *testing.T) {
97
98
return & v1.PodList {ListMeta : metav1.ListMeta {ResourceVersion : "1" }}, nil
98
99
},
99
100
}
100
- go r .Run (stopCh )
101
+ doneCh := make (chan struct {})
102
+ go func () {
103
+ defer close (doneCh )
104
+ r .Run (stopCh )
105
+ }()
101
106
// Synchronously add a dummy pod into the watch channel so we
102
107
// know the RunUntil go routine is in the watch handler.
103
108
fw .Add (& v1.Pod {ObjectMeta : metav1.ObjectMeta {Name : "bar" }})
109
+
104
110
close (stopCh )
105
- select {
106
- case _ , ok := <- fw .ResultChan ():
107
- if ok {
108
- t .Errorf ("Watch channel left open after stopping the watch" )
111
+ resultCh := fw .ResultChan ()
112
+ for {
113
+ select {
114
+ case <- doneCh :
115
+ if resultCh == nil {
116
+ return // both closed
117
+ }
118
+ doneCh = nil
119
+ case _ , ok := <- resultCh :
120
+ if ok {
121
+ t .Fatalf ("Watch channel left open after stopping the watch" )
122
+ }
123
+ if doneCh == nil {
124
+ return // both closed
125
+ }
126
+ resultCh = nil
127
+ case <- time .After (wait .ForeverTestTimeout ):
128
+ t .Fatalf ("the cancellation is at least %s late" , wait .ForeverTestTimeout .String ())
109
129
}
110
- case <- time .After (wait .ForeverTestTimeout ):
111
- t .Errorf ("the cancellation is at least %s late" , wait .ForeverTestTimeout .String ())
112
- break
113
130
}
114
131
}
115
132
@@ -126,24 +143,59 @@ func TestReflectorResyncChan(t *testing.T) {
126
143
}
127
144
}
128
145
129
- // TestEstablishedWatchStoppedAfterStopCh ensures that
130
- // an established watch will be closed right after
131
- // the StopCh was also closed.
132
- func TestEstablishedWatchStoppedAfterStopCh (t * testing.T ) {
133
- ctx , ctxCancel := context .WithCancel (context .TODO ())
134
- ctxCancel ()
135
- w := watch .NewFake ()
136
- require .False (t , w .IsStopped ())
137
-
138
- // w is stopped when the stopCh is closed
139
- target := NewReflector (nil , & v1.Pod {}, nil , 0 )
140
- err := target .watch (w , ctx .Done (), nil )
146
+ // TestReflectorWatchStoppedBefore ensures that neither List nor Watch are
147
+ // called if the stop channel is closed before Reflector.watch is called.
148
+ func TestReflectorWatchStoppedBefore (t * testing.T ) {
149
+ stopCh := make (chan struct {})
150
+ close (stopCh )
151
+
152
+ lw := & ListWatch {
153
+ ListFunc : func (_ metav1.ListOptions ) (runtime.Object , error ) {
154
+ t .Fatal ("ListFunc called unexpectedly" )
155
+ return nil , nil
156
+ },
157
+ WatchFunc : func (_ metav1.ListOptions ) (watch.Interface , error ) {
158
+ // If WatchFunc is never called, the watcher it returns doesn't need to be stopped.
159
+ t .Fatal ("WatchFunc called unexpectedly" )
160
+ return nil , nil
161
+ },
162
+ }
163
+ target := NewReflector (lw , & v1.Pod {}, nil , 0 )
164
+
165
+ err := target .watch (nil , stopCh , nil )
141
166
require .NoError (t , err )
142
- require .True (t , w .IsStopped ())
167
+ }
168
+
169
+ // TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if
170
+ // the stop channel is closed after Reflector.watch has started watching.
171
+ func TestReflectorWatchStoppedAfter (t * testing.T ) {
172
+ stopCh := make (chan struct {})
173
+
174
+ var watchers []* watch.FakeWatcher
143
175
144
- // noop when the w is nil and the ctx is closed
145
- err = target .watch (nil , ctx .Done (), nil )
176
+ lw := & ListWatch {
177
+ ListFunc : func (_ metav1.ListOptions ) (runtime.Object , error ) {
178
+ t .Fatal ("ListFunc called unexpectedly" )
179
+ return nil , nil
180
+ },
181
+ WatchFunc : func (_ metav1.ListOptions ) (watch.Interface , error ) {
182
+ // Simulate the stop channel being closed after watching has started
183
+ go func () {
184
+ time .Sleep (10 * time .Millisecond )
185
+ close (stopCh )
186
+ }()
187
+ // Use a fake watcher that never sends events
188
+ w := watch .NewFake ()
189
+ watchers = append (watchers , w )
190
+ return w , nil
191
+ },
192
+ }
193
+ target := NewReflector (lw , & v1.Pod {}, nil , 0 )
194
+
195
+ err := target .watch (nil , stopCh , nil )
146
196
require .NoError (t , err )
197
+ require .Equal (t , 1 , len (watchers ))
198
+ require .True (t , watchers [0 ].IsStopped ())
147
199
}
148
200
149
201
func BenchmarkReflectorResyncChanMany (b * testing.B ) {
@@ -158,22 +210,148 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
158
210
}
159
211
}
160
212
161
- func TestReflectorWatchHandlerError (t * testing.T ) {
213
+ // TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when
214
+ // stopCh is already closed before handleWatch was called. It also ensures that
215
+ // ResultChan is only called once and that Stop is called after ResultChan.
216
+ func TestReflectorHandleWatchStoppedBefore (t * testing.T ) {
162
217
s := NewStore (MetaNamespaceKeyFunc )
163
218
g := NewReflector (& testLW {}, & v1.Pod {}, s , 0 )
164
- fw := watch .NewFake ()
165
- go func () {
166
- fw .Stop ()
167
- }()
219
+ stopCh := make (chan struct {})
220
+ // Simulate the watch channel being closed before the watchHandler is called
221
+ close (stopCh )
222
+ var calls []string
223
+ resultCh := make (chan watch.Event )
224
+ fw := watch.MockWatcher {
225
+ StopFunc : func () {
226
+ calls = append (calls , "Stop" )
227
+ close (resultCh )
228
+ },
229
+ ResultChanFunc : func () <- chan watch.Event {
230
+ calls = append (calls , "ResultChan" )
231
+ return resultCh
232
+ },
233
+ }
234
+ err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , g .setLastSyncResourceVersion , nil , g .clock , nevererrc , stopCh )
235
+ if err == nil {
236
+ t .Errorf ("unexpected non-error" )
237
+ }
238
+ // Ensure the watcher methods are called exactly once in this exact order.
239
+ // TODO(karlkfi): Fix watchHandler to call Stop()
240
+ // assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
241
+ assert .Equal (t , []string {"ResultChan" }, calls )
242
+ }
243
+
244
+ // TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when
245
+ // stopCh is closed after handleWatch was called. It also ensures that
246
+ // ResultChan is only called once and that Stop is called after ResultChan.
247
+ func TestReflectorHandleWatchStoppedAfter (t * testing.T ) {
248
+ s := NewStore (MetaNamespaceKeyFunc )
249
+ g := NewReflector (& testLW {}, & v1.Pod {}, s , 0 )
250
+ var calls []string
251
+ stopCh := make (chan struct {})
252
+ resultCh := make (chan watch.Event )
253
+ fw := watch.MockWatcher {
254
+ StopFunc : func () {
255
+ calls = append (calls , "Stop" )
256
+ close (resultCh )
257
+ },
258
+ ResultChanFunc : func () <- chan watch.Event {
259
+ calls = append (calls , "ResultChan" )
260
+ resultCh = make (chan watch.Event )
261
+ // Simulate the watch handler being stopped asynchronously by the
262
+ // caller, after watching has started.
263
+ go func () {
264
+ time .Sleep (10 * time .Millisecond )
265
+ close (stopCh )
266
+ }()
267
+ return resultCh
268
+ },
269
+ }
270
+ err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , g .setLastSyncResourceVersion , nil , g .clock , nevererrc , stopCh )
271
+ if err == nil {
272
+ t .Errorf ("unexpected non-error" )
273
+ }
274
+ // Ensure the watcher methods are called exactly once in this exact order.
275
+ // TODO(karlkfi): Fix watchHandler to call Stop()
276
+ // assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
277
+ assert .Equal (t , []string {"ResultChan" }, calls )
278
+ }
279
+
280
+ // TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
281
+ // stops when the result channel is closed before handleWatch was called.
282
+ func TestReflectorHandleWatchResultChanClosedBefore (t * testing.T ) {
283
+ s := NewStore (MetaNamespaceKeyFunc )
284
+ g := NewReflector (& testLW {}, & v1.Pod {}, s , 0 )
285
+ var calls []string
286
+ resultCh := make (chan watch.Event )
287
+ fw := watch.MockWatcher {
288
+ StopFunc : func () {
289
+ calls = append (calls , "Stop" )
290
+ },
291
+ ResultChanFunc : func () <- chan watch.Event {
292
+ calls = append (calls , "ResultChan" )
293
+ return resultCh
294
+ },
295
+ }
296
+ // Simulate the result channel being closed by the producer before handleWatch is called.
297
+ close (resultCh )
168
298
err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , g .setLastSyncResourceVersion , nil , g .clock , nevererrc , wait .NeverStop )
169
299
if err == nil {
170
300
t .Errorf ("unexpected non-error" )
171
301
}
302
+ // Ensure the watcher methods are called exactly once in this exact order.
303
+ // TODO(karlkfi): Fix watchHandler to call Stop()
304
+ // assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
305
+ assert .Equal (t , []string {"ResultChan" }, calls )
306
+ }
307
+
308
+ // TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
309
+ // stops when the result channel is closed after handleWatch has started watching.
310
+ func TestReflectorHandleWatchResultChanClosedAfter (t * testing.T ) {
311
+ s := NewStore (MetaNamespaceKeyFunc )
312
+ g := NewReflector (& testLW {}, & v1.Pod {}, s , 0 )
313
+ var calls []string
314
+ resultCh := make (chan watch.Event )
315
+ fw := watch.MockWatcher {
316
+ StopFunc : func () {
317
+ calls = append (calls , "Stop" )
318
+ },
319
+ ResultChanFunc : func () <- chan watch.Event {
320
+ calls = append (calls , "ResultChan" )
321
+ resultCh = make (chan watch.Event )
322
+ // Simulate the result channel being closed by the producer, after
323
+ // watching has started.
324
+ go func () {
325
+ time .Sleep (10 * time .Millisecond )
326
+ close (resultCh )
327
+ }()
328
+ return resultCh
329
+ },
330
+ }
331
+ err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , g .setLastSyncResourceVersion , nil , g .clock , nevererrc , wait .NeverStop )
332
+ if err == nil {
333
+ t .Errorf ("unexpected non-error" )
334
+ }
335
+ // Ensure the watcher methods are called exactly once in this exact order.
336
+ // TODO(karlkfi): Fix watchHandler to call Stop()
337
+ // assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
338
+ assert .Equal (t , []string {"ResultChan" }, calls )
172
339
}
173
340
174
341
func TestReflectorWatchHandler (t * testing.T ) {
175
342
s := NewStore (MetaNamespaceKeyFunc )
176
343
g := NewReflector (& testLW {}, & v1.Pod {}, s , 0 )
344
+ // Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
345
+ // watching after all the events have been consumed. This avoids race
346
+ // conditions which can happen if the producer calls Stop(), instead of the
347
+ // consumer.
348
+ stopCh := make (chan struct {})
349
+ setLastSyncResourceVersion := func (rv string ) {
350
+ g .setLastSyncResourceVersion (rv )
351
+ if rv == "32" {
352
+ close (stopCh )
353
+ }
354
+ }
177
355
fw := watch .NewFake ()
178
356
s .Add (& v1.Pod {ObjectMeta : metav1.ObjectMeta {Name : "foo" }})
179
357
s .Add (& v1.Pod {ObjectMeta : metav1.ObjectMeta {Name : "bar" }})
@@ -184,15 +362,16 @@ func TestReflectorWatchHandler(t *testing.T) {
184
362
fw .Add (& v1.Pod {ObjectMeta : metav1.ObjectMeta {Name : "baz" , ResourceVersion : "32" }})
185
363
fw .Stop ()
186
364
}()
187
- err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , g . setLastSyncResourceVersion , nil , g .clock , nevererrc , wait . NeverStop )
188
- if err != nil {
365
+ err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , setLastSyncResourceVersion , nil , g .clock , nevererrc , stopCh )
366
+ if ! errors . Is ( err , errorStopRequested ) {
189
367
t .Errorf ("unexpected error %v" , err )
190
368
}
191
369
192
370
mkPod := func (id string , rv string ) * v1.Pod {
193
371
return & v1.Pod {ObjectMeta : metav1.ObjectMeta {Name : id , ResourceVersion : rv }}
194
372
}
195
373
374
+ // Validate that the Store was updated by the events
196
375
table := []struct {
197
376
Pod * v1.Pod
198
377
exists bool
@@ -215,12 +394,7 @@ func TestReflectorWatchHandler(t *testing.T) {
215
394
}
216
395
}
217
396
218
- // RV should send the last version we see.
219
- if e , a := "32" , g .LastSyncResourceVersion (); e != a {
220
- t .Errorf ("expected %v, got %v" , e , a )
221
- }
222
-
223
- // last sync resource version should be the last version synced with store
397
+ // Validate that setLastSyncResourceVersion was called with the RV from the last event.
224
398
if e , a := "32" , g .LastSyncResourceVersion (); e != a {
225
399
t .Errorf ("expected %v, got %v" , e , a )
226
400
}
@@ -230,8 +404,8 @@ func TestReflectorStopWatch(t *testing.T) {
230
404
s := NewStore (MetaNamespaceKeyFunc )
231
405
g := NewReflector (& testLW {}, & v1.Pod {}, s , 0 )
232
406
fw := watch .NewFake ()
233
- stopWatch := make (chan struct {}, 1 )
234
- stopWatch <- struct {}{}
407
+ stopWatch := make (chan struct {})
408
+ close ( stopWatch )
235
409
err := watchHandler (time .Now (), fw , s , g .expectedType , g .expectedGVK , g .name , g .typeDescription , g .setLastSyncResourceVersion , nil , g .clock , nevererrc , stopWatch )
236
410
if err != errorStopRequested {
237
411
t .Errorf ("expected stop error, got %q" , err )
@@ -361,6 +535,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
361
535
}
362
536
}
363
537
watchRet , watchErr := item .events , item .watchErr
538
+ stopCh := make (chan struct {})
364
539
lw := & testLW {
365
540
WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
366
541
if watchErr != nil {
@@ -372,7 +547,13 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
372
547
for _ , e := range watchRet {
373
548
fw .Action (e .Type , e .Object )
374
549
}
375
- fw .Stop ()
550
+ // Because FakeWatcher doesn't buffer events, it's safe to
551
+ // close the stop channel immediately without missing events.
552
+ // But usually, the event producer would instead close the
553
+ // result channel, and wait for the consumer to stop the
554
+ // watcher, to avoid race conditions.
555
+ // TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
556
+ close (stopCh )
376
557
}()
377
558
return fw , nil
378
559
},
@@ -381,7 +562,16 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
381
562
},
382
563
}
383
564
r := NewReflector (lw , & v1.Pod {}, s , 0 )
384
- r .ListAndWatch (wait .NeverStop )
565
+ err := r .ListAndWatch (stopCh )
566
+ if item .listErr != nil && ! errors .Is (err , item .listErr ) {
567
+ t .Errorf ("unexpected ListAndWatch error: %v" , err )
568
+ }
569
+ if item .watchErr != nil && ! errors .Is (err , item .watchErr ) {
570
+ t .Errorf ("unexpected ListAndWatch error: %v" , err )
571
+ }
572
+ if item .listErr == nil && item .watchErr == nil {
573
+ assert .NoError (t , err )
574
+ }
385
575
}
386
576
}
387
577
0 commit comments