Skip to content

Commit 5dcf08c

Browse files
committed
Switch pager to return whether the result was paginated
1 parent 773d358 commit 5dcf08c

File tree

5 files changed

+146
-54
lines changed

5 files changed

+146
-54
lines changed

staging/src/k8s.io/client-go/tools/cache/reflector.go

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ type Reflector struct {
7575
ShouldResync func() bool
7676
// clock allows tests to manipulate time
7777
clock clock.Clock
78+
// paginatedResult defines whether pagination should be forced for list calls.
79+
// It is set based on the result of the initial list call.
80+
paginatedResult bool
7881
// lastSyncResourceVersion is the resource version token last
7982
// observed when doing a sync with the underlying store
8083
// it is thread safe, but not synchronized with the underlying store
@@ -209,6 +212,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
209212
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
210213
defer initTrace.LogIfLong(10 * time.Second)
211214
var list runtime.Object
215+
var paginatedResult bool
212216
var err error
213217
listCh := make(chan struct{}, 1)
214218
panicCh := make(chan interface{}, 1)
@@ -223,34 +227,38 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
223227
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
224228
return r.listerWatcher.List(opts)
225229
}))
226-
if r.WatchListPageSize != 0 {
230+
switch {
231+
case r.WatchListPageSize != 0:
227232
pager.PageSize = r.WatchListPageSize
228-
} else {
233+
case r.paginatedResult:
234+
// We got a paginated result initially. Assume this resource and server honor
235+
// paging requests (i.e. watch cache is probably disabled) and leave the default
236+
// pager size set.
237+
case options.ResourceVersion != "" && options.ResourceVersion != "0":
229238
// User didn't explicitly request pagination.
230-
if options.ResourceVersion != "" && options.ResourceVersion != "0" {
231-
// We also don't turn off pagination for ResourceVersion="0", since watch cache
232-
// is ignoring Limit in that case anyway, and if watchcache is not enabled we
233-
// don't introduce regression.
234-
235-
// With ResourceVersion != "", we have a possibility to list from watch cache,
236-
// but we do that (for ResourceVersion != "0") only if Limit is unset.
237-
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
238-
// switch off pagination to force listing from watch cache (if enabled).
239-
// With the existing semantic of RV (result is at least as fresh as provided RV),
240-
// this is correct and doesn't lead to going back in time.
241-
pager.PageSize = 0
242-
}
239+
//
240+
// With ResourceVersion != "", we have a possibility to list from watch cache,
241+
// but we do that (for ResourceVersion != "0") only if Limit is unset.
242+
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
243+
// switch off pagination to force listing from watch cache (if enabled).
244+
// With the existing semantic of RV (result is at least as fresh as provided RV),
245+
// this is correct and doesn't lead to going back in time.
246+
//
247+
// We also don't turn off pagination for ResourceVersion="0", since watch cache
248+
// is ignoring Limit in that case anyway, and if watch cache is not enabled
249+
// we don't introduce regression.
250+
pager.PageSize = 0
243251
}
244252

245-
list, err = pager.List(context.Background(), options)
253+
list, paginatedResult, err = pager.List(context.Background(), options)
246254
if isExpiredError(err) {
247255
r.setIsLastSyncResourceVersionExpired(true)
248256
// Retry immediately if the resource version used to list is expired.
249257
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
250258
// continuation pages, but the pager might not be enabled, or the full list might fail because the
251259
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
252260
// to recover and ensure the reflector makes forward progress.
253-
list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
261+
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
254262
}
255263
close(listCh)
256264
}()
@@ -264,6 +272,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
264272
if err != nil {
265273
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
266274
}
275+
276+
// We check if the list was paginated and if so set the paginatedResult based on that.
277+
// However, we want to do that only for the initial list (which is the only case
278+
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
279+
// situations we may force listing directly from etcd (by setting ResourceVersion="")
280+
// which will return paginated result, even if watch cache is enabled. However, in
281+
// that case, we still want to prefer sending requests to watch cache if possible.
282+
//
283+
// Paginated result returned for request with ResourceVersion="0" mean that watch
284+
// cache is disabled and there are a lot of objects of a given type. In such case,
285+
// there is no need to prefer listing from watch cache.
286+
if options.ResourceVersion == "0" && paginatedResult {
287+
r.paginatedResult = true
288+
}
289+
267290
r.setIsLastSyncResourceVersionExpired(false) // list was successful
268291
initTrace.Step("Objects listed")
269292
listMetaInterface, err := meta.ListAccessor(list)

staging/src/k8s.io/client-go/tools/cache/reflector_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,57 @@ func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
472472
}
473473
}
474474

475+
func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
476+
var stopCh chan struct{}
477+
s := NewStore(MetaNamespaceKeyFunc)
478+
479+
lw := &testLW{
480+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
481+
// Stop once the reflector begins watching since we're only interested in the list.
482+
close(stopCh)
483+
fw := watch.NewFake()
484+
return fw, nil
485+
},
486+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
487+
// Check that default pager limit is set.
488+
if options.Limit != 500 {
489+
t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
490+
}
491+
pods := make([]v1.Pod, 10)
492+
for i := 0; i < 10; i++ {
493+
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
494+
}
495+
switch options.Continue {
496+
case "":
497+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
498+
case "C1":
499+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
500+
case "C2":
501+
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
502+
default:
503+
t.Fatalf("Unrecognized continue: %s", options.Continue)
504+
}
505+
return nil, nil
506+
},
507+
}
508+
r := NewReflector(lw, &v1.Pod{}, s, 0)
509+
510+
// Initial list should initialize paginatedResult in the reflector.
511+
stopCh = make(chan struct{})
512+
r.ListAndWatch(stopCh)
513+
if results := s.List(); len(results) != 10 {
514+
t.Errorf("Expected 10 results, got %d", len(results))
515+
}
516+
517+
// Since initial list for ResourceVersion="0" was paginated, the subsequent
518+
// ones should also be paginated.
519+
stopCh = make(chan struct{})
520+
r.ListAndWatch(stopCh)
521+
if results := s.List(); len(results) != 10 {
522+
t.Errorf("Expected 10 results, got %d", len(results))
523+
}
524+
}
525+
475526
// TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
476527
// it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
477528
// etcd that is partitioned and serving older data than the reflector has already processed.

staging/src/k8s.io/client-go/tools/pager/pager.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,18 @@ func New(fn ListPageFunc) *ListPager {
7373
// List returns a single list object, but attempts to retrieve smaller chunks from the
7474
// server to reduce the impact on the server. If the chunk attempt fails, it will load
7575
// the full list instead. The Limit field on options, if unset, will default to the page size.
76-
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
76+
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
7777
if options.Limit == 0 {
7878
options.Limit = p.PageSize
7979
}
8080
requestedResourceVersion := options.ResourceVersion
8181
var list *metainternalversion.List
82+
paginatedResult := false
83+
8284
for {
8385
select {
8486
case <-ctx.Done():
85-
return nil, ctx.Err()
87+
return nil, paginatedResult, ctx.Err()
8688
default:
8789
}
8890

@@ -93,23 +95,24 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
9395
// failing when the resource versions is established by the first page request falls out of the compaction
9496
// during the subsequent list requests).
9597
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
96-
return nil, err
98+
return nil, paginatedResult, err
9799
}
98100
// the list expired while we were processing, fall back to a full list at
99101
// the requested ResourceVersion.
100102
options.Limit = 0
101103
options.Continue = ""
102104
options.ResourceVersion = requestedResourceVersion
103-
return p.PageFn(ctx, options)
105+
result, err := p.PageFn(ctx, options)
106+
return result, paginatedResult, err
104107
}
105108
m, err := meta.ListAccessor(obj)
106109
if err != nil {
107-
return nil, fmt.Errorf("returned object must be a list: %v", err)
110+
return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
108111
}
109112

110113
// exit early and return the object we got if we haven't processed any pages
111114
if len(m.GetContinue()) == 0 && list == nil {
112-
return obj, nil
115+
return obj, paginatedResult, nil
113116
}
114117

115118
// initialize the list and fill its contents
@@ -122,12 +125,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
122125
list.Items = append(list.Items, obj)
123126
return nil
124127
}); err != nil {
125-
return nil, err
128+
return nil, paginatedResult, err
126129
}
127130

128131
// if we have no more items, return the list
129132
if len(m.GetContinue()) == 0 {
130-
return list, nil
133+
return list, paginatedResult, nil
131134
}
132135

133136
// set the next loop up
@@ -136,6 +139,8 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
136139
// `specifying resource version is not allowed when using continue` error.
137140
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
138141
options.ResourceVersion = ""
142+
// At this point, result is already paginated.
143+
paginatedResult = true
139144
}
140145
}
141146

staging/src/k8s.io/client-go/tools/pager/pager_test.go

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options
120120
}
121121
return p.PagedList(ctx, options)
122122
}
123+
123124
func TestListPager_List(t *testing.T) {
124125
type fields struct {
125126
PageSize int64
@@ -135,43 +136,50 @@ func TestListPager_List(t *testing.T) {
135136
fields fields
136137
args args
137138
want runtime.Object
139+
wantPaged bool
138140
wantErr bool
139141
isExpired bool
140142
}{
141143
{
142-
name: "empty page",
143-
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
144-
args: args{},
145-
want: list(0, "rv:20"),
144+
name: "empty page",
145+
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
146+
args: args{},
147+
want: list(0, "rv:20"),
148+
wantPaged: false,
146149
},
147150
{
148-
name: "one page",
149-
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
150-
args: args{},
151-
want: list(9, "rv:20"),
151+
name: "one page",
152+
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
153+
args: args{},
154+
want: list(9, "rv:20"),
155+
wantPaged: false,
152156
},
153157
{
154-
name: "one full page",
155-
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
156-
args: args{},
157-
want: list(10, "rv:20"),
158+
name: "one full page",
159+
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
160+
args: args{},
161+
want: list(10, "rv:20"),
162+
wantPaged: false,
158163
},
159164
{
160-
name: "two pages",
161-
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
162-
args: args{},
163-
want: list(11, "rv:20"),
165+
name: "two pages",
166+
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
167+
args: args{},
168+
want: list(11, "rv:20"),
169+
wantPaged: true,
164170
},
165171
{
166-
name: "three pages",
167-
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
168-
args: args{},
169-
want: list(21, "rv:20"),
172+
name: "three pages",
173+
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
174+
args: args{},
175+
want: list(21, "rv:20"),
176+
wantPaged: true,
170177
},
171178
{
172179
name: "expires on second page",
173180
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
174181
args: args{},
182+
wantPaged: true,
175183
wantErr: true,
176184
isExpired: true,
177185
},
@@ -182,14 +190,16 @@ func TestListPager_List(t *testing.T) {
182190
PageSize: 10,
183191
PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList,
184192
},
185-
args: args{},
186-
want: list(21, "rv:20"),
193+
args: args{},
194+
want: list(21, "rv:20"),
195+
wantPaged: true,
187196
},
188197
{
189-
name: "two pages with resourceVersion",
190-
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
191-
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
192-
want: list(11, "rv:20"),
198+
name: "two pages with resourceVersion",
199+
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
200+
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
201+
want: list(11, "rv:20"),
202+
wantPaged: true,
193203
},
194204
}
195205
for _, tt := range tests {
@@ -203,7 +213,7 @@ func TestListPager_List(t *testing.T) {
203213
if ctx == nil {
204214
ctx = context.Background()
205215
}
206-
got, err := p.List(ctx, tt.args.options)
216+
got, paginatedResult, err := p.List(ctx, tt.args.options)
207217
if (err != nil) != tt.wantErr {
208218
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
209219
return
@@ -212,6 +222,9 @@ func TestListPager_List(t *testing.T) {
212222
t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired)
213223
return
214224
}
225+
if tt.wantPaged != paginatedResult {
226+
t.Errorf("paginatedResult = %t, want %t", paginatedResult, tt.wantPaged)
227+
}
215228
if !reflect.DeepEqual(got, tt.want) {
216229
t.Errorf("ListPager.List() = %v, want %v", got, tt.want)
217230
}

test/integration/apiserver/apiserver_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func TestListResourceVersion0(t *testing.T) {
304304

305305
p := pager.New(pager.SimplePageFunc(pagerFn))
306306
p.PageSize = 3
307-
listObj, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"})
307+
listObj, _, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"})
308308
if err != nil {
309309
t.Fatalf("Unexpected list error: %v", err)
310310
}
@@ -360,7 +360,7 @@ func TestAPIListChunking(t *testing.T) {
360360
return list, err
361361
}),
362362
}
363-
listObj, err := p.List(context.Background(), metav1.ListOptions{})
363+
listObj, _, err := p.List(context.Background(), metav1.ListOptions{})
364364
if err != nil {
365365
t.Fatal(err)
366366
}

0 commit comments

Comments
 (0)