Skip to content

Commit d52ecd5

Browse files
authored
Merge pull request kubernetes#86430 from wojtek-t/avoid_thundering_herd_on_etcd
Avoid thundering herd of relists on etcd
2 parents 76c8964 + 5dcf08c commit d52ecd5

File tree

5 files changed

+197
-44
lines changed

5 files changed

+197
-44
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ type Reflector struct {
7575
ShouldResync func() bool
7676
// clock allows tests to manipulate time
7777
clock clock.Clock
78+
// paginatedResult defines whether pagination should be forced for list calls.
79+
// It is set based on the result of the initial list call.
80+
paginatedResult bool
7881
// lastSyncResourceVersion is the resource version token last
7982
// observed when doing a sync with the underlying store
8083
// it is thread safe, but not synchronized with the underlying store
@@ -85,7 +88,12 @@ type Reflector struct {
8588
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
8689
lastSyncResourceVersionMutex sync.RWMutex
8790
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
88-
// Defaults to pager.PageSize.
91+
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
92+
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
93+
// it will turn off pagination to allow serving them from watch cache.
94+
// NOTE: It should be used carefully as paginated lists are always served directly from
95+
// etcd, which is significantly less efficient and may lead to serious performance and
96+
// scalability problems.
8997
WatchListPageSize int64
9098
}
9199

@@ -204,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
204212
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
205213
defer initTrace.LogIfLong(10 * time.Second)
206214
var list runtime.Object
215+
var paginatedResult bool
207216
var err error
208217
listCh := make(chan struct{}, 1)
209218
panicCh := make(chan interface{}, 1)
@@ -218,19 +227,38 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
218227
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
219228
return r.listerWatcher.List(opts)
220229
}))
221-
if r.WatchListPageSize != 0 {
230+
switch {
231+
case r.WatchListPageSize != 0:
222232
pager.PageSize = r.WatchListPageSize
233+
case r.paginatedResult:
234+
// We got a paginated result initially. Assume this resource and server honor
235+
// paging requests (i.e. watch cache is probably disabled) and leave the default
236+
// pager size set.
237+
case options.ResourceVersion != "" && options.ResourceVersion != "0":
238+
// User didn't explicitly request pagination.
239+
//
240+
// With ResourceVersion != "", we have a possibility to list from watch cache,
241+
// but we do that (for ResourceVersion != "0") only if Limit is unset.
242+
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
243+
// switch off pagination to force listing from watch cache (if enabled).
244+
// With the existing semantic of RV (result is at least as fresh as provided RV),
245+
// this is correct and doesn't lead to going back in time.
246+
//
247+
// We also don't turn off pagination for ResourceVersion="0", since watch cache
248+
// is ignoring Limit in that case anyway, and if watch cache is not enabled
249+
// we don't introduce regression.
250+
pager.PageSize = 0
223251
}
224252

225-
list, err = pager.List(context.Background(), options)
253+
list, paginatedResult, err = pager.List(context.Background(), options)
226254
if isExpiredError(err) {
227255
r.setIsLastSyncResourceVersionExpired(true)
228256
// Retry immediately if the resource version used to list is expired.
229257
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
230258
// continuation pages, but the pager might not be enabled, or the full list might fail because the
231259
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
232260
// to recover and ensure the reflector makes forward progress.
233-
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
261+
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
234262
}
235263
close(listCh)
236264
}()
@@ -244,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
244272
if err != nil {
245273
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
246274
}
275+
276+
// We check if the list was paginated and if so set the paginatedResult based on that.
277+
// However, we want to do that only for the initial list (which is the only case
278+
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
279+
// situations we may force listing directly from etcd (by setting ResourceVersion="")
280+
// which will return paginated result, even if watch cache is enabled. However, in
281+
// that case, we still want to prefer sending requests to watch cache if possible.
282+
//
283+
// Paginated result returned for request with ResourceVersion="0" mean that watch
284+
// cache is disabled and there are a lot of objects of a given type. In such case,
285+
// there is no need to prefer listing from watch cache.
286+
if options.ResourceVersion == "0" && paginatedResult {
287+
r.paginatedResult = true
288+
}
289+
247290
r.setIsLastSyncResourceVersionExpired(false) // list was successful
248291
initTrace.Step("Objects listed")
249292
listMetaInterface, err := meta.ListAccessor(list)
@@ -320,7 +363,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
320363
if err != nil {
321364
switch {
322365
case isExpiredError(err):
323-
r.setIsLastSyncResourceVersionExpired(true)
366+
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
367+
// has a semantic that it returns data at least as fresh as provided RV.
368+
// So first try to LIST with setting RV to resource version of last observed object.
324369
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
325370
case err == io.EOF:
326371
// watch closed normally
@@ -344,8 +389,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
344389
if err != errorStopRequested {
345390
switch {
346391
case isExpiredError(err):
347-
r.setIsLastSyncResourceVersionExpired(true)
348-
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
392+
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
393+
// has a semantic that it returns data at least as fresh as provided RV.
394+
// So first try to LIST with setting RV to resource version of last observed object.
395+
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
349396
default:
350397
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
351398
}

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ func TestReflectorWatchListPageSize(t *testing.T) {
425425
},
426426
}
427427
r := NewReflector(lw, &v1.Pod{}, s, 0)
428+
// Set resource version to test pagination also for not consistent reads.
429+
r.setLastSyncResourceVersion("10")
428430
// Set the reflector to paginate the list request in 4 item chunks.
429431
r.WatchListPageSize = 4
430432
r.ListAndWatch(stopCh)
@@ -435,6 +437,92 @@ func TestReflectorWatchListPageSize(t *testing.T) {
435437
}
436438
}
437439

440+
func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
441+
stopCh := make(chan struct{})
442+
s := NewStore(MetaNamespaceKeyFunc)
443+
444+
lw := &testLW{
445+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
446+
// Stop once the reflector begins watching since we're only interested in the list.
447+
close(stopCh)
448+
fw := watch.NewFake()
449+
return fw, nil
450+
},
451+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
452+
if options.ResourceVersion != "10" {
453+
t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
454+
}
455+
if options.Limit != 0 {
456+
t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
457+
}
458+
pods := make([]v1.Pod, 10)
459+
for i := 0; i < 10; i++ {
460+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
461+
}
462+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
463+
},
464+
}
465+
r := NewReflector(lw, &v1.Pod{}, s, 0)
466+
r.setLastSyncResourceVersion("10")
467+
r.ListAndWatch(stopCh)
468+
469+
results := s.List()
470+
if len(results) != 10 {
471+
t.Errorf("Expected 10 results, got %d", len(results))
472+
}
473+
}
474+
475+
func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
476+
var stopCh chan struct{}
477+
s := NewStore(MetaNamespaceKeyFunc)
478+
479+
lw := &testLW{
480+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
481+
// Stop once the reflector begins watching since we're only interested in the list.
482+
close(stopCh)
483+
fw := watch.NewFake()
484+
return fw, nil
485+
},
486+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
487+
// Check that default pager limit is set.
488+
if options.Limit != 500 {
489+
t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
490+
}
491+
pods := make([]v1.Pod, 10)
492+
for i := 0; i < 10; i++ {
493+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
494+
}
495+
switch options.Continue {
496+
case "":
497+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
498+
case "C1":
499+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
500+
case "C2":
501+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
502+
default:
503+
t.Fatalf("Unrecognized continue: %s", options.Continue)
504+
}
505+
return nil, nil
506+
},
507+
}
508+
r := NewReflector(lw, &v1.Pod{}, s, 0)
509+
510+
// Initial list should initialize paginatedResult in the reflector.
511+
stopCh = make(chan struct{})
512+
r.ListAndWatch(stopCh)
513+
if results := s.List(); len(results) != 10 {
514+
t.Errorf("Expected 10 results, got %d", len(results))
515+
}
516+
517+
// Since initial list for ResourceVersion="0" was paginated, the subsequent
518+
// ones should also be paginated.
519+
stopCh = make(chan struct{})
520+
r.ListAndWatch(stopCh)
521+
if results := s.List(); len(results) != 10 {
522+
t.Errorf("Expected 10 results, got %d", len(results))
523+
}
524+
}
525+
438526
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
439527
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
440528
// etcd that is partitioned and serving older data than the reflector has already processed.

staging/src/k8s.io/client-go/tools/pager/pager.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager {
7373
// List returns a single list object, but attempts to retrieve smaller chunks from the
7474
// server to reduce the impact on the server. If the chunk attempt fails, it will load
7575
// the full list instead. The Limit field on options, if unset, will default to the page size.
76-
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
76+
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
7777
if options.Limit == 0 {
7878
options.Limit = p.PageSize
7979
}
8080
requestedResourceVersion := options.ResourceVersion
8181
var list *metainternalversion.List
82+
paginatedResult := false
83+
8284
for {
8385
select {
8486
case <-ctx.Done():
85-
return nil, ctx.Err()
87+
return nil, paginatedResult, ctx.Err()
8688
default:
8789
}
8890

@@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
9395
// failing when the resource versions is established by the first page request falls out of the compaction
9496
// during the subsequent list requests).
9597
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
96-
return nil, err
98+
return nil, paginatedResult, err
9799
}
98100
// the list expired while we were processing, fall back to a full list at
99101
// the requested ResourceVersion.
100102
options.Limit = 0
101103
options.Continue = ""
102104
options.ResourceVersion = requestedResourceVersion
103-
return p.PageFn(ctx, options)
105+
result, err := p.PageFn(ctx, options)
106+
return result, paginatedResult, err
104107
}
105108
m, err := meta.ListAccessor(obj)
106109
if err != nil {
107-
return nil, fmt.Errorf("returned object must be a list: %v", err)
110+
return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
108111
}
109112

110113
// exit early and return the object we got if we haven't processed any pages
111114
if len(m.GetContinue()) == 0 && list == nil {
112-
return obj, nil
115+
return obj, paginatedResult, nil
113116
}
114117

115118
// initialize the list and fill its contents
@@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
122125
list.Items = append(list.Items, obj)
123126
return nil
124127
}); err != nil {
125-
return nil, err
128+
return nil, paginatedResult, err
126129
}
127130

128131
// if we have no more items, return the list
129132
if len(m.GetContinue()) == 0 {
130-
return list, nil
133+
return list, paginatedResult, nil
131134
}
132135

133136
// set the next loop up
@@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
136139
// `specifying resource version is not allowed when using continue` error.
137140
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
138141
options.ResourceVersion = ""
142+
// At this point, result is already paginated.
143+
paginatedResult = true
139144
}
140145
}
141146

0 commit comments

Comments
 (0)