diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index efbe19d1b..1a2749b5e 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -56,6 +56,8 @@ const ( // Limit is required to avoid memory spikes during cache initialization. // The default limit of 50 is chosen based on experiments. defaultListSemaphoreWeight = 50 + // defaultListItemSemaphoreWeight limits the amount of items to process in parallel for each k8s list. + defaultListItemSemaphoreWeight = int64(1) // defaultEventProcessingInterval is the default interval for processing events defaultEventProcessingInterval = 100 * time.Millisecond ) @@ -164,15 +166,16 @@ type ListRetryFunc func(err error) bool func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache { log := textlogger.NewLogger(textlogger.NewConfig()) cache := &clusterCache{ - settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}}, - apisMeta: make(map[schema.GroupKind]*apiMeta), - eventMetaCh: nil, - listPageSize: defaultListPageSize, - listPageBufferSize: defaultListPageBufferSize, - listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), - resources: make(map[kube.ResourceKey]*Resource), - nsIndex: make(map[string]map[kube.ResourceKey]*Resource), - config: config, + settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}}, + apisMeta: make(map[schema.GroupKind]*apiMeta), + eventMetaCh: nil, + listPageSize: defaultListPageSize, + listPageBufferSize: defaultListPageBufferSize, + listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), + listItemSemaphoreWeight: defaultListItemSemaphoreWeight, + resources: make(map[kube.ResourceKey]*Resource), + nsIndex: make(map[string]map[kube.ResourceKey]*Resource), + config: config, kubectl: &kube.KubectlCmd{ Log: log, Tracer: tracing.NopTracer{}, @@ -219,8 +222,9 @@ type clusterCache struct { // size of a page for list operations pager. listPageSize int64 // number of pages to prefetch for list pager. - listPageBufferSize int32 - listSemaphore WeightedSemaphore + listPageBufferSize int32 + listSemaphore WeightedSemaphore + listItemSemaphoreWeight int64 // retry options for list operations listRetryLimit int32 @@ -262,6 +266,35 @@ type clusterCacheSync struct { resyncTimeout time.Duration } +// listItemTaskLimiter limits the amount of list items to process in parallel. +type listItemTaskLimiter struct { + sem WeightedSemaphore + wg sync.WaitGroup +} + +// Run executes the given task concurrently, blocking if the pool is at capacity. +func (t *listItemTaskLimiter) Run(ctx context.Context, task func()) error { + t.wg.Add(1) + if err := t.sem.Acquire(ctx, 1); err != nil { + t.wg.Done() + return fmt.Errorf("failed to acquire semaphore: %w", err) + } + + go func() { + defer t.wg.Done() + defer t.sem.Release(1) + + task() + }() + + return nil +} + +// Wait blocks until all submitted tasks have completed. +func (t *listItemTaskLimiter) Wait() { + t.wg.Wait() +} + // ListRetryFuncNever never retries on errors func ListRetryFuncNever(_ error) bool { return false @@ -446,6 +479,13 @@ func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource { return resource } +func (c *clusterCache) newListItemTaskLimiter() *listItemTaskLimiter { + return &listItemTaskLimiter{ + sem: semaphore.NewWeighted(c.listItemSemaphoreWeight), + wg: sync.WaitGroup{}, + } +} + func (c *clusterCache) setNode(n *Resource) { key := n.ResourceKey() c.resources[key] = n @@ -629,17 +669,33 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso // loadInitialState loads the state of all the resources retrieved by the given resource client. func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) { - var items []*Resource + var ( + items []*Resource + listLock = sync.Mutex{} + limiter = c.newListItemTaskLimiter() + ) + resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { if un, ok := obj.(*unstructured.Unstructured); !ok { return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) } else { - items = append(items, c.newResource(un)) + if err := limiter.Run(ctx, func() { + newRes := c.newResource(un) + listLock.Lock() + items = append(items, newRes) + listLock.Unlock() + }); err != nil { + return fmt.Errorf("failed to process list item: %w", err) + } } return nil }) }) + + // Wait until all items have completed processing. + limiter.Wait() + if err != nil { return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err) } @@ -938,19 +994,29 @@ func (c *clusterCache) sync() error { lock.Unlock() return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { + limiter := c.newListItemTaskLimiter() + resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error { if un, ok := obj.(*unstructured.Unstructured); !ok { return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) } else { - newRes := c.newResource(un) - lock.Lock() - c.setNode(newRes) - lock.Unlock() + if err := limiter.Run(ctx, func() { + newRes := c.newResource(un) + lock.Lock() + c.setNode(newRes) + lock.Unlock() + }); err != nil { + return fmt.Errorf("failed to process list item: %w", err) + } } return nil }) }) + + // Wait until all items have completed processing. + limiter.Wait() + if err != nil { if c.isRestrictedResource(err) { keep := false diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 8aa286fc4..9e98f0ff6 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -147,8 +147,39 @@ func getChildren(cluster *clusterCache, un *unstructured.Unstructured) []*Resour return hierarchy[1:] } -// Benchmark_sync is meant to simulate cluster initialization when populateResourceInfoHandler does nontrivial work. -func Benchmark_sync(t *testing.B) { +// BenchmarkSync benchmarks cluster initialization when populateResourceInfoHandler does nontrivial work. +// The benchmark is executed using different list item semaphore weights. +func BenchmarkSync(b *testing.B) { + b.Run("weight=1,overhead=100μs", func(bb *testing.B) { + runBenchmarkSync(bb, 1, 100*time.Microsecond) + }) + b.Run("weight=2,overhead=100μs", func(bb *testing.B) { + runBenchmarkSync(bb, 2, 100*time.Microsecond) + }) + b.Run("weight=4,overhead=100μs", func(bb *testing.B) { + runBenchmarkSync(bb, 4, 100*time.Microsecond) + }) + b.Run("weight=8,overhead=100μs", func(bb *testing.B) { + runBenchmarkSync(bb, 8, 100*time.Microsecond) + }) + + b.Run("weight=1,overhead=500μs", func(bb *testing.B) { + runBenchmarkSync(bb, 1, 500*time.Microsecond) + }) + b.Run("weight=2,overhead=500μs", func(bb *testing.B) { + runBenchmarkSync(bb, 2, 500*time.Microsecond) + }) + b.Run("weight=4,overhead=500μs", func(bb *testing.B) { + runBenchmarkSync(bb, 4, 500*time.Microsecond) + }) + b.Run("weight=8,overhead=500μs", func(bb *testing.B) { + runBenchmarkSync(bb, 8, 500*time.Microsecond) + }) +} + +func runBenchmarkSync(b *testing.B, weight int64, overhead time.Duration) { + b.Helper() + resources := []runtime.Object{} for i := 0; i < 100; i++ { resources = append(resources, &corev1.Pod{ @@ -174,18 +205,19 @@ func Benchmark_sync(t *testing.B) { }) } - c := newCluster(t, resources...) + c := newCluster(b, resources...) + c.listItemSemaphoreWeight = weight c.populateResourceInfoHandler = func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { - time.Sleep(10 * time.Microsecond) + time.Sleep(overhead) return nil, false } - t.ResetTimer() + b.ResetTimer() - for n := 0; n < t.N; n++ { + for n := 0; n < b.N; n++ { err := c.sync() - require.NoError(t, err) + require.NoError(b, err) } } diff --git a/pkg/cache/settings.go b/pkg/cache/settings.go index 692ac2674..b6eb3760b 100644 --- a/pkg/cache/settings.go +++ b/pkg/cache/settings.go @@ -102,6 +102,13 @@ func SetListSemaphore(listSemaphore WeightedSemaphore) UpdateSettingsFunc { } } +// SetListItemSemaphoreWeight sets the weight to limit the amount of k8s list items to process in parallel. +func SetListItemSemaphoreWeight(listItemSemaphoreWeight int64) UpdateSettingsFunc { + return func(cache *clusterCache) { + cache.listItemSemaphoreWeight = listItemSemaphoreWeight + } +} + // SetResyncTimeout updates cluster re-sync timeout func SetResyncTimeout(timeout time.Duration) UpdateSettingsFunc { return func(cache *clusterCache) { diff --git a/pkg/cache/settings_test.go b/pkg/cache/settings_test.go index fdc1a7412..e309c266a 100644 --- a/pkg/cache/settings_test.go +++ b/pkg/cache/settings_test.go @@ -72,3 +72,12 @@ func TestSetEventsProcessingInterval(t *testing.T) { cache.Invalidate(SetEventProcessingInterval(interval)) assert.Equal(t, interval, cache.eventProcessingInterval) } + +func TestSetListItemSemaphoreWeight(t *testing.T) { + cache := NewClusterCache(&rest.Config{}) + assert.Equal(t, defaultListItemSemaphoreWeight, cache.listItemSemaphoreWeight) + + weight := int64(8) + cache.Invalidate(SetListItemSemaphoreWeight(weight)) + assert.Equal(t, weight, cache.listItemSemaphoreWeight) +}