@@ -32,7 +32,10 @@ import (
32
32
"k8s.io/apimachinery/pkg/runtime"
33
33
"k8s.io/apimachinery/pkg/runtime/schema"
34
34
"k8s.io/apimachinery/pkg/util/dump"
35
+ "k8s.io/apimachinery/pkg/util/wait"
35
36
"k8s.io/apimachinery/pkg/watch"
37
+ clientfeatures "k8s.io/client-go/features"
38
+ clientfeaturestesting "k8s.io/client-go/features/testing"
36
39
fakeclientset "k8s.io/client-go/kubernetes/fake"
37
40
testcore "k8s.io/client-go/testing"
38
41
"k8s.io/client-go/tools/cache"
@@ -134,96 +137,180 @@ func (a byEventTypeAndName) Less(i, j int) bool {
134
137
return a [i ].Object .(* corev1.Secret ).Name < a [j ].Object .(* corev1.Secret ).Name
135
138
}
136
139
140
+ func newTestSecret (name , key , value string ) * corev1.Secret {
141
+ return & corev1.Secret {
142
+ ObjectMeta : metav1.ObjectMeta {
143
+ Name : name ,
144
+ },
145
+ StringData : map [string ]string {
146
+ key : value ,
147
+ },
148
+ }
149
+ }
150
+
137
151
func TestNewInformerWatcher (t * testing.T ) {
138
152
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
139
153
tt := []struct {
140
- name string
141
- objects []runtime.Object
142
- events []watch.Event
154
+ name string
155
+ watchListFeatureEnabled bool
156
+ objects []runtime.Object
157
+ inputEvents []watch.Event
158
+ outputEvents []watch.Event
143
159
}{
144
160
{
145
- name : "basic test" ,
161
+ name : "WatchListClient feature disabled" ,
162
+ watchListFeatureEnabled : false ,
146
163
objects : []runtime.Object {
147
- & corev1.Secret {
148
- ObjectMeta : metav1.ObjectMeta {
149
- Name : "pod-1" ,
150
- },
151
- StringData : map [string ]string {
152
- "foo-1" : "initial" ,
153
- },
164
+ newTestSecret ("pod-1" , "foo-1" , "initial" ),
165
+ newTestSecret ("pod-2" , "foo-2" , "initial" ),
166
+ newTestSecret ("pod-3" , "foo-3" , "initial" ),
167
+ },
168
+ inputEvents : []watch.Event {
169
+ {
170
+ Type : watch .Added ,
171
+ Object : newTestSecret ("pod-4" , "foo-4" , "initial" ),
154
172
},
155
- & corev1.Secret {
156
- ObjectMeta : metav1.ObjectMeta {
157
- Name : "pod-2" ,
158
- },
159
- StringData : map [string ]string {
160
- "foo-2" : "initial" ,
161
- },
173
+ {
174
+ Type : watch .Modified ,
175
+ Object : newTestSecret ("pod-2" , "foo-2" , "new" ),
162
176
},
163
- & corev1.Secret {
164
- ObjectMeta : metav1.ObjectMeta {
165
- Name : "pod-3" ,
166
- },
167
- StringData : map [string ]string {
168
- "foo-3" : "initial" ,
169
- },
177
+ {
178
+ Type : watch .Deleted ,
179
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
170
180
},
171
181
},
172
- events : []watch.Event {
182
+ outputEvents : []watch.Event {
183
+ // When WatchListClient is disabled, ListAndWatch creates fake
184
+ // ADDED events for each object listed.
173
185
{
174
- Type : watch .Added ,
175
- Object : & corev1.Secret {
176
- ObjectMeta : metav1.ObjectMeta {
177
- Name : "pod-4" ,
178
- },
179
- StringData : map [string ]string {
180
- "foo-4" : "initial" ,
181
- },
182
- },
186
+ Type : watch .Added ,
187
+ Object : newTestSecret ("pod-1" , "foo-1" , "initial" ),
183
188
},
184
189
{
185
- Type : watch .Modified ,
186
- Object : & corev1.Secret {
187
- ObjectMeta : metav1.ObjectMeta {
188
- Name : "pod-2" ,
189
- },
190
- StringData : map [string ]string {
191
- "foo-2" : "new" ,
192
- },
193
- },
190
+ Type : watch .Added ,
191
+ Object : newTestSecret ("pod-2" , "foo-2" , "initial" ),
192
+ },
193
+ {
194
+ Type : watch .Added ,
195
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
196
+ },
197
+ // Normal events follow.
198
+ {
199
+ Type : watch .Added ,
200
+ Object : newTestSecret ("pod-4" , "foo-4" , "initial" ),
201
+ },
202
+ {
203
+ Type : watch .Modified ,
204
+ Object : newTestSecret ("pod-2" , "foo-2" , "new" ),
205
+ },
206
+ {
207
+ Type : watch .Deleted ,
208
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
209
+ },
210
+ },
211
+ },
212
+ {
213
+ name : "WatchListClient feature enabled" ,
214
+ watchListFeatureEnabled : true ,
215
+ objects : []runtime.Object {
216
+ newTestSecret ("pod-1" , "foo-1" , "initial" ),
217
+ newTestSecret ("pod-2" , "foo-2" , "initial" ),
218
+ newTestSecret ("pod-3" , "foo-3" , "initial" ),
219
+ },
220
+ inputEvents : []watch.Event {
221
+ {
222
+ Type : watch .Added ,
223
+ Object : newTestSecret ("pod-1" , "foo-1" , "initial" ),
224
+ },
225
+ {
226
+ Type : watch .Added ,
227
+ Object : newTestSecret ("pod-2" , "foo-2" , "initial" ),
194
228
},
195
229
{
196
- Type : watch .Deleted ,
230
+ Type : watch .Added ,
231
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
232
+ },
233
+ // ListWatch bookmark indicates that initial listing is done
234
+ {
235
+ Type : watch .Bookmark ,
197
236
Object : & corev1.Secret {
198
237
ObjectMeta : metav1.ObjectMeta {
199
- Name : "pod-3" ,
238
+ Annotations : map [string ]string {
239
+ metav1 .InitialEventsAnnotationKey : "true" ,
240
+ },
200
241
},
201
242
},
202
243
},
244
+ {
245
+ Type : watch .Added ,
246
+ Object : newTestSecret ("pod-4" , "foo-4" , "initial" ),
247
+ },
248
+ {
249
+ Type : watch .Modified ,
250
+ Object : newTestSecret ("pod-2" , "foo-2" , "new" ),
251
+ },
252
+ {
253
+ Type : watch .Deleted ,
254
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
255
+ },
256
+ },
257
+ outputEvents : []watch.Event {
258
+ // When WatchListClient is enabled, WatchList receives
259
+ // ADDED events from the server for each existing object.
260
+ {
261
+ Type : watch .Added ,
262
+ Object : newTestSecret ("pod-1" , "foo-1" , "initial" ),
263
+ },
264
+ {
265
+ Type : watch .Added ,
266
+ Object : newTestSecret ("pod-2" , "foo-2" , "initial" ),
267
+ },
268
+ {
269
+ Type : watch .Added ,
270
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
271
+ },
272
+ // Bookmark event at the end of listing is not sent to the client.
273
+ // Normal events follow.
274
+ {
275
+ Type : watch .Added ,
276
+ Object : newTestSecret ("pod-4" , "foo-4" , "initial" ),
277
+ },
278
+ {
279
+ Type : watch .Modified ,
280
+ Object : newTestSecret ("pod-2" , "foo-2" , "new" ),
281
+ },
282
+ {
283
+ Type : watch .Deleted ,
284
+ Object : newTestSecret ("pod-3" , "foo-3" , "initial" ),
285
+ },
203
286
},
204
287
},
205
288
}
206
289
207
290
for _ , tc := range tt {
208
291
t .Run (tc .name , func (t * testing.T ) {
209
- var expected []watch.Event
210
- for _ , o := range tc .objects {
211
- expected = append (expected , watch.Event {
212
- Type : watch .Added ,
213
- Object : o .DeepCopyObject (),
214
- })
215
- }
216
- for _ , e := range tc .events {
217
- expected = append (expected , * e .DeepCopy ())
218
- }
292
+ clientfeaturestesting .SetFeatureDuringTest (t , clientfeatures .WatchListClient , tc .watchListFeatureEnabled )
219
293
220
294
fake := fakeclientset .NewSimpleClientset (tc .objects ... )
221
- fakeWatch := watch .NewFakeWithChanSize (len (tc .events ), false )
222
- fake .PrependWatchReactor ("secrets" , testcore .DefaultWatchReactor (fakeWatch , nil ))
223
-
224
- for _ , e := range tc .events {
225
- fakeWatch .Action (e .Type , e .Object )
226
- }
295
+ inputCh := make (chan watch.Event )
296
+ inputWatcher := watch .NewProxyWatcher (inputCh )
297
+ // Indexer should stop the input watcher when the output watcher is stopped.
298
+ // But stop it at the end of the test, just in case.
299
+ defer inputWatcher .Stop ()
300
+ inputStopCh := inputWatcher .StopChan ()
301
+ fake .PrependWatchReactor ("secrets" , testcore .DefaultWatchReactor (inputWatcher , nil ))
302
+ // Send events and then close the done channel
303
+ inputDoneCh := make (chan struct {})
304
+ go func () {
305
+ defer close (inputDoneCh )
306
+ for _ , e := range tc .inputEvents {
307
+ select {
308
+ case inputCh <- e :
309
+ case <- inputStopCh :
310
+ return
311
+ }
312
+ }
313
+ }()
227
314
228
315
lw := & cache.ListWatch {
229
316
ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
@@ -233,46 +320,58 @@ func TestNewInformerWatcher(t *testing.T) {
233
320
return fake .CoreV1 ().Secrets ("" ).Watch (context .TODO (), options )
234
321
},
235
322
}
236
- _ , _ , w , done := NewIndexerInformerWatcher (lw , & corev1.Secret {})
237
-
323
+ _ , _ , outputWatcher , informerDoneCh := NewIndexerInformerWatcher (lw , & corev1.Secret {})
324
+ outputCh := outputWatcher .ResultChan ()
325
+ timeoutCh := time .After (wait .ForeverTestTimeout )
238
326
var result []watch.Event
239
327
loop:
240
328
for {
241
- var event watch.Event
242
- var ok bool
243
329
select {
244
- case event , ok = <- w . ResultChan () :
330
+ case event , ok : = <- outputCh :
245
331
if ! ok {
246
- t .Errorf ("Failed to read event: channel is already closed! " )
247
- return
332
+ t .Errorf ("Output result channel closed prematurely " )
333
+ break loop
248
334
}
249
-
250
335
result = append (result , * event .DeepCopy ())
251
- case <- time .After (time .Second * 1 ):
252
- // All the events are buffered -> this means we are done
253
- // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
336
+ if len (result ) >= len (tc .outputEvents ) {
337
+ break loop
338
+ }
339
+ case <- timeoutCh :
340
+ t .Error ("Timed out waiting for events" )
254
341
break loop
255
342
}
256
343
}
257
344
258
- // Informers don't guarantee event order so we need to sort these arrays to compare them
259
- sort .Sort (byEventTypeAndName (expected ))
345
+ // Informers don't guarantee event order so we need to sort these arrays to compare them.
346
+ sort .Sort (byEventTypeAndName (tc . outputEvents ))
260
347
sort .Sort (byEventTypeAndName (result ))
261
348
262
- if ! reflect .DeepEqual (expected , result ) {
263
- t .Errorf ("\n expected: %s,\n got: %s,\n diff: %s" , dump .Pretty (expected ), dump .Pretty (result ), cmp .Diff (expected , result ))
349
+ if ! reflect .DeepEqual (tc . outputEvents , result ) {
350
+ t .Errorf ("\n expected: %s,\n got: %s,\n diff: %s" , dump .Pretty (tc . outputEvents ), dump .Pretty (result ), cmp .Diff (tc . outputEvents , result ))
264
351
return
265
352
}
266
353
267
- // Fill in some data to test watch closing while there are some events to be read
268
- for _ , e := range tc .events {
269
- fakeWatch .Action (e .Type , e .Object )
270
- }
354
+ // Send some more events, but don't read them.
355
+ // Stop producing events when the consumer stops the watcher.
356
+ go func () {
357
+ defer close (inputCh )
358
+ for _ , e := range tc .inputEvents {
359
+ select {
360
+ case inputCh <- e :
361
+ case <- inputStopCh :
362
+ return
363
+ }
364
+ }
365
+ }()
271
366
272
367
// Stop before reading all the data to make sure the informer can deal with closed channel
273
- w .Stop ()
368
+ outputWatcher .Stop ()
274
369
275
- <- done
370
+ select {
371
+ case <- informerDoneCh :
372
+ case <- time .After (wait .ForeverTestTimeout ):
373
+ t .Error ("Timed out waiting for informer to cleanup" )
374
+ }
276
375
})
277
376
}
278
377
0 commit comments