Skip to content

Commit 169a952

Browse files
authored
Merge pull request kubernetes#125302 from karlkfi/karl-informer-watcher-test
Update TestNewInformerWatcher for WatchListClient
2 parents cf4e4c0 + 28e3a72 commit 169a952

File tree

1 file changed

+182
-83
lines changed

1 file changed

+182
-83
lines changed

staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go

Lines changed: 182 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ import (
3232
"k8s.io/apimachinery/pkg/runtime"
3333
"k8s.io/apimachinery/pkg/runtime/schema"
3434
"k8s.io/apimachinery/pkg/util/dump"
35+
"k8s.io/apimachinery/pkg/util/wait"
3536
"k8s.io/apimachinery/pkg/watch"
37+
clientfeatures "k8s.io/client-go/features"
38+
clientfeaturestesting "k8s.io/client-go/features/testing"
3639
fakeclientset "k8s.io/client-go/kubernetes/fake"
3740
testcore "k8s.io/client-go/testing"
3841
"k8s.io/client-go/tools/cache"
@@ -134,96 +137,180 @@ func (a byEventTypeAndName) Less(i, j int) bool {
134137
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
135138
}
136139

140+
func newTestSecret(name, key, value string) *corev1.Secret {
141+
return &corev1.Secret{
142+
ObjectMeta: metav1.ObjectMeta{
143+
Name: name,
144+
},
145+
StringData: map[string]string{
146+
key: value,
147+
},
148+
}
149+
}
150+
137151
func TestNewInformerWatcher(t *testing.T) {
138152
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
139153
tt := []struct {
140-
name string
141-
objects []runtime.Object
142-
events []watch.Event
154+
name string
155+
watchListFeatureEnabled bool
156+
objects []runtime.Object
157+
inputEvents []watch.Event
158+
outputEvents []watch.Event
143159
}{
144160
{
145-
name: "basic test",
161+
name: "WatchListClient feature disabled",
162+
watchListFeatureEnabled: false,
146163
objects: []runtime.Object{
147-
&corev1.Secret{
148-
ObjectMeta: metav1.ObjectMeta{
149-
Name: "pod-1",
150-
},
151-
StringData: map[string]string{
152-
"foo-1": "initial",
153-
},
164+
newTestSecret("pod-1", "foo-1", "initial"),
165+
newTestSecret("pod-2", "foo-2", "initial"),
166+
newTestSecret("pod-3", "foo-3", "initial"),
167+
},
168+
inputEvents: []watch.Event{
169+
{
170+
Type: watch.Added,
171+
Object: newTestSecret("pod-4", "foo-4", "initial"),
154172
},
155-
&corev1.Secret{
156-
ObjectMeta: metav1.ObjectMeta{
157-
Name: "pod-2",
158-
},
159-
StringData: map[string]string{
160-
"foo-2": "initial",
161-
},
173+
{
174+
Type: watch.Modified,
175+
Object: newTestSecret("pod-2", "foo-2", "new"),
162176
},
163-
&corev1.Secret{
164-
ObjectMeta: metav1.ObjectMeta{
165-
Name: "pod-3",
166-
},
167-
StringData: map[string]string{
168-
"foo-3": "initial",
169-
},
177+
{
178+
Type: watch.Deleted,
179+
Object: newTestSecret("pod-3", "foo-3", "initial"),
170180
},
171181
},
172-
events: []watch.Event{
182+
outputEvents: []watch.Event{
183+
// When WatchListClient is disabled, ListAndWatch creates fake
184+
// ADDED events for each object listed.
173185
{
174-
Type: watch.Added,
175-
Object: &corev1.Secret{
176-
ObjectMeta: metav1.ObjectMeta{
177-
Name: "pod-4",
178-
},
179-
StringData: map[string]string{
180-
"foo-4": "initial",
181-
},
182-
},
186+
Type: watch.Added,
187+
Object: newTestSecret("pod-1", "foo-1", "initial"),
183188
},
184189
{
185-
Type: watch.Modified,
186-
Object: &corev1.Secret{
187-
ObjectMeta: metav1.ObjectMeta{
188-
Name: "pod-2",
189-
},
190-
StringData: map[string]string{
191-
"foo-2": "new",
192-
},
193-
},
190+
Type: watch.Added,
191+
Object: newTestSecret("pod-2", "foo-2", "initial"),
192+
},
193+
{
194+
Type: watch.Added,
195+
Object: newTestSecret("pod-3", "foo-3", "initial"),
196+
},
197+
// Normal events follow.
198+
{
199+
Type: watch.Added,
200+
Object: newTestSecret("pod-4", "foo-4", "initial"),
201+
},
202+
{
203+
Type: watch.Modified,
204+
Object: newTestSecret("pod-2", "foo-2", "new"),
205+
},
206+
{
207+
Type: watch.Deleted,
208+
Object: newTestSecret("pod-3", "foo-3", "initial"),
209+
},
210+
},
211+
},
212+
{
213+
name: "WatchListClient feature enabled",
214+
watchListFeatureEnabled: true,
215+
objects: []runtime.Object{
216+
newTestSecret("pod-1", "foo-1", "initial"),
217+
newTestSecret("pod-2", "foo-2", "initial"),
218+
newTestSecret("pod-3", "foo-3", "initial"),
219+
},
220+
inputEvents: []watch.Event{
221+
{
222+
Type: watch.Added,
223+
Object: newTestSecret("pod-1", "foo-1", "initial"),
224+
},
225+
{
226+
Type: watch.Added,
227+
Object: newTestSecret("pod-2", "foo-2", "initial"),
194228
},
195229
{
196-
Type: watch.Deleted,
230+
Type: watch.Added,
231+
Object: newTestSecret("pod-3", "foo-3", "initial"),
232+
},
233+
// ListWatch bookmark indicates that initial listing is done
234+
{
235+
Type: watch.Bookmark,
197236
Object: &corev1.Secret{
198237
ObjectMeta: metav1.ObjectMeta{
199-
Name: "pod-3",
238+
Annotations: map[string]string{
239+
metav1.InitialEventsAnnotationKey: "true",
240+
},
200241
},
201242
},
202243
},
244+
{
245+
Type: watch.Added,
246+
Object: newTestSecret("pod-4", "foo-4", "initial"),
247+
},
248+
{
249+
Type: watch.Modified,
250+
Object: newTestSecret("pod-2", "foo-2", "new"),
251+
},
252+
{
253+
Type: watch.Deleted,
254+
Object: newTestSecret("pod-3", "foo-3", "initial"),
255+
},
256+
},
257+
outputEvents: []watch.Event{
258+
// When WatchListClient is enabled, WatchList receives
259+
// ADDED events from the server for each existing object.
260+
{
261+
Type: watch.Added,
262+
Object: newTestSecret("pod-1", "foo-1", "initial"),
263+
},
264+
{
265+
Type: watch.Added,
266+
Object: newTestSecret("pod-2", "foo-2", "initial"),
267+
},
268+
{
269+
Type: watch.Added,
270+
Object: newTestSecret("pod-3", "foo-3", "initial"),
271+
},
272+
// Bookmark event at the end of listing is not sent to the client.
273+
// Normal events follow.
274+
{
275+
Type: watch.Added,
276+
Object: newTestSecret("pod-4", "foo-4", "initial"),
277+
},
278+
{
279+
Type: watch.Modified,
280+
Object: newTestSecret("pod-2", "foo-2", "new"),
281+
},
282+
{
283+
Type: watch.Deleted,
284+
Object: newTestSecret("pod-3", "foo-3", "initial"),
285+
},
203286
},
204287
},
205288
}
206289

207290
for _, tc := range tt {
208291
t.Run(tc.name, func(t *testing.T) {
209-
var expected []watch.Event
210-
for _, o := range tc.objects {
211-
expected = append(expected, watch.Event{
212-
Type: watch.Added,
213-
Object: o.DeepCopyObject(),
214-
})
215-
}
216-
for _, e := range tc.events {
217-
expected = append(expected, *e.DeepCopy())
218-
}
292+
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, tc.watchListFeatureEnabled)
219293

220294
fake := fakeclientset.NewSimpleClientset(tc.objects...)
221-
fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
222-
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
223-
224-
for _, e := range tc.events {
225-
fakeWatch.Action(e.Type, e.Object)
226-
}
295+
inputCh := make(chan watch.Event)
296+
inputWatcher := watch.NewProxyWatcher(inputCh)
297+
// Indexer should stop the input watcher when the output watcher is stopped.
298+
// But stop it at the end of the test, just in case.
299+
defer inputWatcher.Stop()
300+
inputStopCh := inputWatcher.StopChan()
301+
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(inputWatcher, nil))
302+
// Send events and then close the done channel
303+
inputDoneCh := make(chan struct{})
304+
go func() {
305+
defer close(inputDoneCh)
306+
for _, e := range tc.inputEvents {
307+
select {
308+
case inputCh <- e:
309+
case <-inputStopCh:
310+
return
311+
}
312+
}
313+
}()
227314

228315
lw := &cache.ListWatch{
229316
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@@ -233,46 +320,58 @@ func TestNewInformerWatcher(t *testing.T) {
233320
return fake.CoreV1().Secrets("").Watch(context.TODO(), options)
234321
},
235322
}
236-
_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
237-
323+
_, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{})
324+
outputCh := outputWatcher.ResultChan()
325+
timeoutCh := time.After(wait.ForeverTestTimeout)
238326
var result []watch.Event
239327
loop:
240328
for {
241-
var event watch.Event
242-
var ok bool
243329
select {
244-
case event, ok = <-w.ResultChan():
330+
case event, ok := <-outputCh:
245331
if !ok {
246-
t.Errorf("Failed to read event: channel is already closed!")
247-
return
332+
t.Errorf("Output result channel closed prematurely")
333+
break loop
248334
}
249-
250335
result = append(result, *event.DeepCopy())
251-
case <-time.After(time.Second * 1):
252-
// All the events are buffered -> this means we are done
253-
// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
336+
if len(result) >= len(tc.outputEvents) {
337+
break loop
338+
}
339+
case <-timeoutCh:
340+
t.Error("Timed out waiting for events")
254341
break loop
255342
}
256343
}
257344

258-
// Informers don't guarantee event order so we need to sort these arrays to compare them
259-
sort.Sort(byEventTypeAndName(expected))
345+
// Informers don't guarantee event order so we need to sort these arrays to compare them.
346+
sort.Sort(byEventTypeAndName(tc.outputEvents))
260347
sort.Sort(byEventTypeAndName(result))
261348

262-
if !reflect.DeepEqual(expected, result) {
263-
t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result))
349+
if !reflect.DeepEqual(tc.outputEvents, result) {
350+
t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(tc.outputEvents), dump.Pretty(result), cmp.Diff(tc.outputEvents, result))
264351
return
265352
}
266353

267-
// Fill in some data to test watch closing while there are some events to be read
268-
for _, e := range tc.events {
269-
fakeWatch.Action(e.Type, e.Object)
270-
}
354+
// Send some more events, but don't read them.
355+
// Stop producing events when the consumer stops the watcher.
356+
go func() {
357+
defer close(inputCh)
358+
for _, e := range tc.inputEvents {
359+
select {
360+
case inputCh <- e:
361+
case <-inputStopCh:
362+
return
363+
}
364+
}
365+
}()
271366

272367
// Stop before reading all the data to make sure the informer can deal with closed channel
273-
w.Stop()
368+
outputWatcher.Stop()
274369

275-
<-done
370+
select {
371+
case <-informerDoneCh:
372+
case <-time.After(wait.ForeverTestTimeout):
373+
t.Error("Timed out waiting for informer to cleanup")
374+
}
276375
})
277376
}
278377

0 commit comments

Comments
 (0)