Skip to content

Commit 0f1cd01

Browse files
committed
Allow list items to be processed in parallel
Signed-off-by: Shady Rafehi <[email protected]>
1 parent cebed7e commit 0f1cd01

File tree

4 files changed

+136
-24
lines changed

4 files changed

+136
-24
lines changed

pkg/cache/cluster.go

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ const (
5656
// Limit is required to avoid memory spikes during cache initialization.
5757
// The default limit of 50 is chosen based on experiments.
5858
defaultListSemaphoreWeight = 50
59+
// defaultListItemSemaphoreWeight limits the amount of items to process in parallel for each k8s list.
60+
defaultListItemSemaphoreWeight = int64(1)
5961
// defaultEventProcessingInterval is the default interval for processing events
6062
defaultEventProcessingInterval = 100 * time.Millisecond
6163
)
@@ -164,15 +166,16 @@ type ListRetryFunc func(err error) bool
164166
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
165167
log := textlogger.NewLogger(textlogger.NewConfig())
166168
cache := &clusterCache{
167-
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
168-
apisMeta: make(map[schema.GroupKind]*apiMeta),
169-
eventMetaCh: nil,
170-
listPageSize: defaultListPageSize,
171-
listPageBufferSize: defaultListPageBufferSize,
172-
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
173-
resources: make(map[kube.ResourceKey]*Resource),
174-
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
175-
config: config,
169+
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
170+
apisMeta: make(map[schema.GroupKind]*apiMeta),
171+
eventMetaCh: nil,
172+
listPageSize: defaultListPageSize,
173+
listPageBufferSize: defaultListPageBufferSize,
174+
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
175+
listItemSemaphoreWeight: defaultListItemSemaphoreWeight,
176+
resources: make(map[kube.ResourceKey]*Resource),
177+
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
178+
config: config,
176179
kubectl: &kube.KubectlCmd{
177180
Log: log,
178181
Tracer: tracing.NopTracer{},
@@ -219,8 +222,9 @@ type clusterCache struct {
219222
// size of a page for list operations pager.
220223
listPageSize int64
221224
// number of pages to prefetch for list pager.
222-
listPageBufferSize int32
223-
listSemaphore WeightedSemaphore
225+
listPageBufferSize int32
226+
listSemaphore WeightedSemaphore
227+
listItemSemaphoreWeight int64
224228

225229
// retry options for list operations
226230
listRetryLimit int32
@@ -262,6 +266,35 @@ type clusterCacheSync struct {
262266
resyncTimeout time.Duration
263267
}
264268

269+
// listItemTaskLimiter limits the amount of list items to process in parallel.
270+
type listItemTaskLimiter struct {
271+
sem WeightedSemaphore
272+
wg sync.WaitGroup
273+
}
274+
275+
// Run executes the given task concurrently, blocking if the pool is at capacity.
276+
func (t *listItemTaskLimiter) Run(ctx context.Context, task func()) error {
277+
t.wg.Add(1)
278+
if err := t.sem.Acquire(ctx, 1); err != nil {
279+
t.wg.Done()
280+
return fmt.Errorf("failed to acquire semaphore: %w", err)
281+
}
282+
283+
go func() {
284+
defer t.wg.Done()
285+
defer t.sem.Release(1)
286+
287+
task()
288+
}()
289+
290+
return nil
291+
}
292+
293+
// Wait blocks until all submitted tasks have completed.
294+
func (t *listItemTaskLimiter) Wait() {
295+
t.wg.Wait()
296+
}
297+
265298
// ListRetryFuncNever never retries on errors
266299
func ListRetryFuncNever(_ error) bool {
267300
return false
@@ -446,6 +479,13 @@ func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource {
446479
return resource
447480
}
448481

482+
func (c *clusterCache) newListItemTaskLimiter() *listItemTaskLimiter {
483+
return &listItemTaskLimiter{
484+
sem: semaphore.NewWeighted(c.listItemSemaphoreWeight),
485+
wg: sync.WaitGroup{},
486+
}
487+
}
488+
449489
func (c *clusterCache) setNode(n *Resource) {
450490
key := n.ResourceKey()
451491
c.resources[key] = n
@@ -629,17 +669,33 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
629669

630670
// loadInitialState loads the state of all the resources retrieved by the given resource client.
631671
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
632-
var items []*Resource
672+
var (
673+
items []*Resource
674+
listLock = sync.Mutex{}
675+
limiter = c.newListItemTaskLimiter()
676+
)
677+
633678
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
634679
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
635680
if un, ok := obj.(*unstructured.Unstructured); !ok {
636681
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
637682
} else {
638-
items = append(items, c.newResource(un))
683+
if err := limiter.Run(ctx, func() {
684+
newRes := c.newResource(un)
685+
listLock.Lock()
686+
items = append(items, newRes)
687+
listLock.Unlock()
688+
}); err != nil {
689+
return fmt.Errorf("failed to process list item: %w", err)
690+
}
639691
}
640692
return nil
641693
})
642694
})
695+
696+
// Wait until all items have completed processing.
697+
limiter.Wait()
698+
643699
if err != nil {
644700
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
645701
}
@@ -938,19 +994,29 @@ func (c *clusterCache) sync() error {
938994
lock.Unlock()
939995

940996
return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
997+
limiter := c.newListItemTaskLimiter()
998+
941999
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
9421000
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
9431001
if un, ok := obj.(*unstructured.Unstructured); !ok {
9441002
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
9451003
} else {
946-
newRes := c.newResource(un)
947-
lock.Lock()
948-
c.setNode(newRes)
949-
lock.Unlock()
1004+
if err := limiter.Run(ctx, func() {
1005+
newRes := c.newResource(un)
1006+
lock.Lock()
1007+
c.setNode(newRes)
1008+
lock.Unlock()
1009+
}); err != nil {
1010+
return fmt.Errorf("failed to process list item: %w", err)
1011+
}
9501012
}
9511013
return nil
9521014
})
9531015
})
1016+
1017+
// Wait until all items have completed processing.
1018+
limiter.Wait()
1019+
9541020
if err != nil {
9551021
if c.isRestrictedResource(err) {
9561022
keep := false

pkg/cache/cluster_test.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,37 @@ func getChildren(cluster *clusterCache, un *unstructured.Unstructured) []*Resour
147147
return hierarchy[1:]
148148
}
149149

150-
// Benchmark_sync is meant to simulate cluster initialization when populateResourceInfoHandler does nontrivial work.
151-
func Benchmark_sync(t *testing.B) {
150+
// BenchmarkSync benchmarks cluster initialization when populateResourceInfoHandler does nontrivial work.
151+
// The benchmark is executed using different list item semaphore weights.
152+
func BenchmarkSync(b *testing.B) {
153+
b.Run("weight=1,overhead=100μs", func(bb *testing.B) {
154+
runBenchmarkSync(bb, 1, 100*time.Microsecond)
155+
})
156+
b.Run("weight=2,overhead=100μs", func(bb *testing.B) {
157+
runBenchmarkSync(bb, 2, 100*time.Microsecond)
158+
})
159+
b.Run("weight=4,overhead=100μs", func(bb *testing.B) {
160+
runBenchmarkSync(bb, 4, 100*time.Microsecond)
161+
})
162+
b.Run("weight=8,overhead=100μs", func(bb *testing.B) {
163+
runBenchmarkSync(bb, 8, 100*time.Microsecond)
164+
})
165+
166+
b.Run("weight=1,overhead=500μs", func(bb *testing.B) {
167+
runBenchmarkSync(bb, 1, 500*time.Microsecond)
168+
})
169+
b.Run("weight=2,overhead=500μs", func(bb *testing.B) {
170+
runBenchmarkSync(bb, 2, 500*time.Microsecond)
171+
})
172+
b.Run("weight=4,overhead=500μs", func(bb *testing.B) {
173+
runBenchmarkSync(bb, 4, 500*time.Microsecond)
174+
})
175+
b.Run("weight=8,overhead=500μs", func(bb *testing.B) {
176+
runBenchmarkSync(bb, 8, 500*time.Microsecond)
177+
})
178+
}
179+
180+
func runBenchmarkSync(b *testing.B, weight int64, overhead time.Duration) {
152181
resources := []runtime.Object{}
153182
for i := 0; i < 100; i++ {
154183
resources = append(resources, &corev1.Pod{
@@ -174,18 +203,19 @@ func Benchmark_sync(t *testing.B) {
174203
})
175204
}
176205

177-
c := newCluster(t, resources...)
206+
c := newCluster(b, resources...)
207+
c.listItemSemaphoreWeight = weight
178208

179209
c.populateResourceInfoHandler = func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
180-
time.Sleep(10 * time.Microsecond)
210+
time.Sleep(overhead)
181211
return nil, false
182212
}
183213

184-
t.ResetTimer()
214+
b.ResetTimer()
185215

186-
for n := 0; n < t.N; n++ {
216+
for n := 0; n < b.N; n++ {
187217
err := c.sync()
188-
require.NoError(t, err)
218+
require.NoError(b, err)
189219
}
190220
}
191221

pkg/cache/settings.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ func SetListSemaphore(listSemaphore WeightedSemaphore) UpdateSettingsFunc {
102102
}
103103
}
104104

105+
// SetListItemSemaphoreWeight sets the weight to limit the amount of k8s list items to process in parallel.
106+
func SetListItemSemaphoreWeight(listItemSemaphoreWeight int64) UpdateSettingsFunc {
107+
return func(cache *clusterCache) {
108+
cache.listItemSemaphoreWeight = listItemSemaphoreWeight
109+
}
110+
}
111+
105112
// SetResyncTimeout updates cluster re-sync timeout
106113
func SetResyncTimeout(timeout time.Duration) UpdateSettingsFunc {
107114
return func(cache *clusterCache) {

pkg/cache/settings_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,12 @@ func TestSetEventsProcessingInterval(t *testing.T) {
7272
cache.Invalidate(SetEventProcessingInterval(interval))
7373
assert.Equal(t, interval, cache.eventProcessingInterval)
7474
}
75+
76+
func TestSetListItemSemaphoreWeight(t *testing.T) {
77+
cache := NewClusterCache(&rest.Config{})
78+
assert.Equal(t, defaultListItemSemaphoreWeight, cache.listItemSemaphoreWeight)
79+
80+
weight := int64(8)
81+
cache.Invalidate(SetListItemSemaphoreWeight(weight))
82+
assert.Equal(t, weight, cache.listItemSemaphoreWeight)
83+
}

0 commit comments

Comments
 (0)