Skip to content

Commit a5fa288

Browse files
authored
Merge pull request #567 from karlkfi/karl-list-limit
feat: pagninate status poller lists
2 parents a9d2ca7 + 95d4274 commit a5fa288

File tree

2 files changed

+307
-39
lines changed

2 files changed

+307
-39
lines changed

pkg/kstatus/polling/clusterreader/caching_reader.go

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,14 @@ import (
1111

1212
apierrors "k8s.io/apimachinery/pkg/api/errors"
1313
"k8s.io/apimachinery/pkg/api/meta"
14+
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1416
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17+
"k8s.io/apimachinery/pkg/fields"
1518
"k8s.io/apimachinery/pkg/labels"
19+
"k8s.io/apimachinery/pkg/runtime"
1620
"k8s.io/apimachinery/pkg/runtime/schema"
21+
"k8s.io/client-go/tools/pager"
1722
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
1823
"sigs.k8s.io/cli-utils/pkg/object"
1924
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -232,13 +237,11 @@ func (c *CachingClusterReader) Sync(ctx context.Context) error {
232237
}
233238
return err
234239
}
235-
var listOptions []client.ListOption
240+
ns := ""
236241
if mapping.Scope == meta.RESTScopeNamespace {
237-
listOptions = append(listOptions, client.InNamespace(gn.Namespace))
242+
ns = gn.Namespace
238243
}
239-
var list unstructured.UnstructuredList
240-
list.SetGroupVersionKind(mapping.GroupVersionKind)
241-
err = c.reader.List(ctx, &list, listOptions...)
244+
list, err := c.listUnstructured(ctx, mapping.GroupVersionKind, ns)
242245
if err != nil {
243246
// If the context was cancelled, we just stop the work and return
244247
// the error.
@@ -254,9 +257,82 @@ func (c *CachingClusterReader) Sync(ctx context.Context) error {
254257
continue
255258
}
256259
cache[gn] = cacheEntry{
257-
resources: list,
260+
resources: *list,
258261
}
259262
}
260263
c.cache = cache
261264
return nil
262265
}
266+
267+
// listUnstructured performs one or more LIST calls, paginating the requests
268+
// and aggregating the results. If aggregated, only the ResourceVersion,
269+
// SelfLink, and Items will be populated. The default page size is 500.
270+
func (c *CachingClusterReader) listUnstructured(
271+
ctx context.Context,
272+
gvk schema.GroupVersionKind,
273+
namespace string,
274+
) (*unstructured.UnstructuredList, error) {
275+
mOpts := metav1.ListOptions{}
276+
mOpts.SetGroupVersionKind(gvk)
277+
obj, _, err := pager.New(c.listPageFunc(namespace)).List(ctx, mOpts)
278+
if err != nil {
279+
return nil, err
280+
}
281+
282+
switch t := obj.(type) {
283+
case *unstructured.UnstructuredList:
284+
// all in one
285+
return t, nil
286+
case *metainternalversion.List:
287+
// aggregated result
288+
u := &unstructured.UnstructuredList{}
289+
u.SetGroupVersionKind(gvk)
290+
// Only ResourceVersion & SelfLink are copied into the aggregated result
291+
// by ListPager.
292+
if t.ResourceVersion != "" {
293+
u.SetResourceVersion(t.ResourceVersion)
294+
}
295+
if t.SelfLink != "" {
296+
u.SetSelfLink(t.SelfLink)
297+
}
298+
u.Items = make([]unstructured.Unstructured, len(t.Items))
299+
for i, item := range t.Items {
300+
ui, ok := item.(*unstructured.Unstructured)
301+
if !ok {
302+
return nil, fmt.Errorf("unexpected list item type: %t", item)
303+
}
304+
u.Items[i] = *ui
305+
}
306+
return u, nil
307+
default:
308+
return nil, fmt.Errorf("unexpected list type: %t", t)
309+
}
310+
}
311+
312+
func (c *CachingClusterReader) listPageFunc(namespace string) pager.ListPageFunc {
313+
return func(ctx context.Context, mOpts metav1.ListOptions) (runtime.Object, error) {
314+
mOptsCopy := mOpts
315+
labelSelector, err := labels.Parse(mOpts.LabelSelector)
316+
if err != nil {
317+
return nil, fmt.Errorf("failed to parse label selector: %w", err)
318+
}
319+
fieldSelector, err := fields.ParseSelector(mOpts.FieldSelector)
320+
if err != nil {
321+
return nil, fmt.Errorf("failed to parse field selector: %w", err)
322+
}
323+
cOpts := &client.ListOptions{
324+
LabelSelector: labelSelector,
325+
FieldSelector: fieldSelector,
326+
Namespace: namespace,
327+
Limit: mOpts.Limit,
328+
Continue: mOpts.Continue,
329+
Raw: &mOptsCopy,
330+
}
331+
var list unstructured.UnstructuredList
332+
list.SetGroupVersionKind(mOpts.GroupVersionKind())
333+
// Note: client.ListOptions only supports Exact ResourceVersion matching.
334+
// So leave ResourceVersion blank to get Any ResourceVersion.
335+
err = c.reader.List(ctx, &list, cOpts)
336+
return &list, err
337+
}
338+
}

0 commit comments

Comments
 (0)