Skip to content

Commit 773d358

Browse files
committed
Avoid thundering herd on etcd on masters upgrade
1 parent 1bb68a2 commit 773d358

File tree

2 files changed

+65
-4
lines changed

2 files changed

+65
-4
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ type Reflector struct {
8585
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
8686
lastSyncResourceVersionMutex sync.RWMutex
8787
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
88-
// Defaults to pager.PageSize.
88+
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
89+
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
90+
// it will turn off pagination to allow serving them from watch cache.
91+
// NOTE: It should be used carefully as paginated lists are always served directly from
92+
// etcd, which is significantly less efficient and may lead to serious performance and
93+
// scalability problems.
8994
WatchListPageSize int64
9095
}
9196

@@ -220,6 +225,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
220225
}))
221226
if r.WatchListPageSize != 0 {
222227
pager.PageSize = r.WatchListPageSize
228+
} else {
229+
// User didn't explicitly request pagination.
230+
if options.ResourceVersion != "" && options.ResourceVersion != "0" {
231+
// We also don't turn off pagination for ResourceVersion="0", since watch cache
232+
// is ignoring Limit in that case anyway, and if watchcache is not enabled we
233+
// don't introduce regression.
234+
235+
// With ResourceVersion != "", we have a possibility to list from watch cache,
236+
// but we do that (for ResourceVersion != "0") only if Limit is unset.
237+
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
238+
// switch off pagination to force listing from watch cache (if enabled).
239+
// With the existing semantic of RV (result is at least as fresh as provided RV),
240+
// this is correct and doesn't lead to going back in time.
241+
pager.PageSize = 0
242+
}
223243
}
224244

225245
list, err = pager.List(context.Background(), options)
@@ -320,7 +340,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
320340
if err != nil {
321341
switch {
322342
case isExpiredError(err):
323-
r.setIsLastSyncResourceVersionExpired(true)
343+
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
344+
// has a semantic that it returns data at least as fresh as provided RV.
345+
// So first try to LIST with setting RV to resource version of last observed object.
324346
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
325347
case err == io.EOF:
326348
// watch closed normally
@@ -344,8 +366,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
344366
if err != errorStopRequested {
345367
switch {
346368
case isExpiredError(err):
347-
r.setIsLastSyncResourceVersionExpired(true)
348-
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
369+
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
370+
// has a semantic that it returns data at least as fresh as provided RV.
371+
// So first try to LIST with setting RV to resource version of last observed object.
372+
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
349373
default:
350374
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
351375
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) {
425425
},
426426
}
427427
r := NewReflector(lw, &v1.Pod{}, s, 0)
428+
// Set resource version to test pagination also for not consistent reads.
429+
r.setLastSyncResourceVersion("10")
428430
// Set the reflector to paginate the list request in 4 item chunks.
429431
r.WatchListPageSize = 4
430432
r.ListAndWatch(stopCh)
@@ -435,6 +437,41 @@ func TestReflectorWatchListPageSize(t *testing.T) {
435437
}
436438
}
437439

440+
func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
441+
stopCh := make(chan struct{})
442+
s := NewStore(MetaNamespaceKeyFunc)
443+
444+
lw := &testLW{
445+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
446+
// Stop once the reflector begins watching since we're only interested in the list.
447+
close(stopCh)
448+
fw := watch.NewFake()
449+
return fw, nil
450+
},
451+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
452+
if options.ResourceVersion != "10" {
453+
t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
454+
}
455+
if options.Limit != 0 {
456+
t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
457+
}
458+
pods := make([]v1.Pod, 10)
459+
for i := 0; i < 10; i++ {
460+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
461+
}
462+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
463+
},
464+
}
465+
r := NewReflector(lw, &v1.Pod{}, s, 0)
466+
r.setLastSyncResourceVersion("10")
467+
r.ListAndWatch(stopCh)
468+
469+
results := s.List()
470+
if len(results) != 10 {
471+
t.Errorf("Expected 10 results, got %d", len(results))
472+
}
473+
}
474+
438475
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
439476
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
440477
// etcd that is partitioned and serving older data than the reflector has already processed.

0 commit comments

Comments
 (0)