Skip to content

Commit 849c153

Browse files
committed
Allow list items to be processed in parallel
Signed-off-by: Shady Rafehi <[email protected]>
1 parent cebed7e commit 849c153

File tree

4 files changed

+163
-52
lines changed

4 files changed

+163
-52
lines changed

pkg/cache/cluster.go

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ const (
5656
// Limit is required to avoid memory spikes during cache initialization.
5757
// The default limit of 50 is chosen based on experiments.
5858
defaultListSemaphoreWeight = 50
59+
// defaultListItemSemaphoreWeight limits the amount of items to process in parallel for each k8s list.
60+
defaultListItemSemaphoreWeight = int64(1)
5961
// defaultEventProcessingInterval is the default interval for processing events
6062
defaultEventProcessingInterval = 100 * time.Millisecond
6163
)
@@ -164,15 +166,16 @@ type ListRetryFunc func(err error) bool
164166
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
165167
log := textlogger.NewLogger(textlogger.NewConfig())
166168
cache := &clusterCache{
167-
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
168-
apisMeta: make(map[schema.GroupKind]*apiMeta),
169-
eventMetaCh: nil,
170-
listPageSize: defaultListPageSize,
171-
listPageBufferSize: defaultListPageBufferSize,
172-
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
173-
resources: make(map[kube.ResourceKey]*Resource),
174-
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
175-
config: config,
169+
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
170+
apisMeta: make(map[schema.GroupKind]*apiMeta),
171+
eventMetaCh: nil,
172+
listPageSize: defaultListPageSize,
173+
listPageBufferSize: defaultListPageBufferSize,
174+
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
175+
listItemSemaphoreWeight: defaultListItemSemaphoreWeight,
176+
resources: make(map[kube.ResourceKey]*Resource),
177+
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
178+
config: config,
176179
kubectl: &kube.KubectlCmd{
177180
Log: log,
178181
Tracer: tracing.NopTracer{},
@@ -219,8 +222,9 @@ type clusterCache struct {
219222
// size of a page for list operations pager.
220223
listPageSize int64
221224
// number of pages to prefetch for list pager.
222-
listPageBufferSize int32
223-
listSemaphore WeightedSemaphore
225+
listPageBufferSize int32
226+
listSemaphore WeightedSemaphore
227+
listItemSemaphoreWeight int64
224228

225229
// retry options for list operations
226230
listRetryLimit int32
@@ -262,6 +266,35 @@ type clusterCacheSync struct {
262266
resyncTimeout time.Duration
263267
}
264268

269+
// listItemTaskLimiter limits the amount of list items to process in parallel.
270+
type listItemTaskLimiter struct {
271+
sem WeightedSemaphore
272+
wg sync.WaitGroup
273+
}
274+
275+
// Run executes the given task concurrently, blocking if the pool is at capacity.
276+
func (t *listItemTaskLimiter) Run(ctx context.Context, task func()) error {
277+
t.wg.Add(1)
278+
if err := t.sem.Acquire(ctx, 1); err != nil {
279+
t.wg.Done()
280+
return fmt.Errorf("failed to acquire semaphore: %w", err)
281+
}
282+
283+
go func() {
284+
defer t.wg.Done()
285+
defer t.sem.Release(1)
286+
287+
task()
288+
}()
289+
290+
return nil
291+
}
292+
293+
// Wait blocks until all submitted tasks have completed.
294+
func (t *listItemTaskLimiter) Wait() {
295+
t.wg.Wait()
296+
}
297+
265298
// ListRetryFuncNever never retries on errors
266299
func ListRetryFuncNever(_ error) bool {
267300
return false
@@ -446,6 +479,13 @@ func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource {
446479
return resource
447480
}
448481

482+
func (c *clusterCache) newListItemTaskLimiter() *listItemTaskLimiter {
483+
return &listItemTaskLimiter{
484+
sem: semaphore.NewWeighted(c.listItemSemaphoreWeight),
485+
wg: sync.WaitGroup{},
486+
}
487+
}
488+
449489
func (c *clusterCache) setNode(n *Resource) {
450490
key := n.ResourceKey()
451491
c.resources[key] = n
@@ -629,17 +669,33 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
629669

630670
// loadInitialState loads the state of all the resources retrieved by the given resource client.
631671
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
632-
var items []*Resource
672+
var (
673+
items []*Resource
674+
listLock = sync.Mutex{}
675+
limiter = c.newListItemTaskLimiter()
676+
)
677+
633678
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
634679
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
635680
if un, ok := obj.(*unstructured.Unstructured); !ok {
636681
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
637682
} else {
638-
items = append(items, c.newResource(un))
683+
if err := limiter.Run(ctx, func() {
684+
newRes := c.newResource(un)
685+
listLock.Lock()
686+
items = append(items, newRes)
687+
listLock.Unlock()
688+
}); err != nil {
689+
return fmt.Errorf("failed to process list item: %w", err)
690+
}
639691
}
640692
return nil
641693
})
642694
})
695+
696+
// Wait until all items have completed processing.
697+
limiter.Wait()
698+
643699
if err != nil {
644700
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
645701
}
@@ -938,19 +994,29 @@ func (c *clusterCache) sync() error {
938994
lock.Unlock()
939995

940996
return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
997+
limiter := c.newListItemTaskLimiter()
998+
941999
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
9421000
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
9431001
if un, ok := obj.(*unstructured.Unstructured); !ok {
9441002
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
9451003
} else {
946-
newRes := c.newResource(un)
947-
lock.Lock()
948-
c.setNode(newRes)
949-
lock.Unlock()
1004+
if err := limiter.Run(ctx, func() {
1005+
newRes := c.newResource(un)
1006+
lock.Lock()
1007+
c.setNode(newRes)
1008+
lock.Unlock()
1009+
}); err != nil {
1010+
return fmt.Errorf("failed to process list item: %w", err)
1011+
}
9501012
}
9511013
return nil
9521014
})
9531015
})
1016+
1017+
// Wait until all items have completed processing.
1018+
limiter.Wait()
1019+
9541020
if err != nil {
9551021
if c.isRestrictedResource(err) {
9561022
keep := false

pkg/cache/cluster_test.go

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -147,46 +147,75 @@ func getChildren(cluster *clusterCache, un *unstructured.Unstructured) []*Resour
147147
return hierarchy[1:]
148148
}
149149

150-
// Benchmark_sync is meant to simulate cluster initialization when populateResourceInfoHandler does nontrivial work.
151-
func Benchmark_sync(t *testing.B) {
152-
resources := []runtime.Object{}
153-
for i := 0; i < 100; i++ {
154-
resources = append(resources, &corev1.Pod{
155-
ObjectMeta: metav1.ObjectMeta{
156-
Name: fmt.Sprintf("pod-%d", i),
157-
Namespace: "default",
158-
},
159-
}, &appsv1.ReplicaSet{
160-
ObjectMeta: metav1.ObjectMeta{
161-
Name: fmt.Sprintf("rs-%d", i),
162-
Namespace: "default",
163-
},
164-
}, &appsv1.Deployment{
165-
ObjectMeta: metav1.ObjectMeta{
166-
Name: fmt.Sprintf("deploy-%d", i),
167-
Namespace: "default",
168-
},
169-
}, &appsv1.StatefulSet{
170-
ObjectMeta: metav1.ObjectMeta{
171-
Name: fmt.Sprintf("sts-%d", i),
172-
Namespace: "default",
173-
},
174-
})
175-
}
150+
// BenchmarkSync benchmarks cluster initialization when populateResourceInfoHandler does nontrivial work.
151+
// The benchmark is executed using different list item semaphore weights.
152+
func BenchmarkSync(b *testing.B) {
153+
run := func(bb *testing.B, weight int64, overhead time.Duration) {
154+
resources := []runtime.Object{}
155+
for i := 0; i < 100; i++ {
156+
resources = append(resources, &corev1.Pod{
157+
ObjectMeta: metav1.ObjectMeta{
158+
Name: fmt.Sprintf("pod-%d", i),
159+
Namespace: "default",
160+
},
161+
}, &appsv1.ReplicaSet{
162+
ObjectMeta: metav1.ObjectMeta{
163+
Name: fmt.Sprintf("rs-%d", i),
164+
Namespace: "default",
165+
},
166+
}, &appsv1.Deployment{
167+
ObjectMeta: metav1.ObjectMeta{
168+
Name: fmt.Sprintf("deploy-%d", i),
169+
Namespace: "default",
170+
},
171+
}, &appsv1.StatefulSet{
172+
ObjectMeta: metav1.ObjectMeta{
173+
Name: fmt.Sprintf("sts-%d", i),
174+
Namespace: "default",
175+
},
176+
})
177+
}
176178

177-
c := newCluster(t, resources...)
179+
c := newCluster(bb, resources...)
180+
c.listItemSemaphoreWeight = weight
178181

179-
c.populateResourceInfoHandler = func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
180-
time.Sleep(10 * time.Microsecond)
181-
return nil, false
182-
}
182+
c.populateResourceInfoHandler = func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
183+
time.Sleep(overhead)
184+
return nil, false
185+
}
183186

184-
t.ResetTimer()
187+
bb.ResetTimer()
185188

186-
for n := 0; n < t.N; n++ {
187-
err := c.sync()
188-
require.NoError(t, err)
189+
for n := 0; n < bb.N; n++ {
190+
err := c.sync()
191+
require.NoError(bb, err)
192+
}
189193
}
194+
195+
b.Run("weight=1,overhead=100μs", func(bb *testing.B) {
196+
run(bb, 1, 100*time.Microsecond)
197+
})
198+
b.Run("weight=2,overhead=100μs", func(bb *testing.B) {
199+
run(bb, 2, 100*time.Microsecond)
200+
})
201+
b.Run("weight=4,overhead=100μs", func(bb *testing.B) {
202+
run(bb, 4, 100*time.Microsecond)
203+
})
204+
b.Run("weight=8,overhead=100μs", func(bb *testing.B) {
205+
run(bb, 8, 100*time.Microsecond)
206+
})
207+
b.Run("weight=1,overhead=500μs", func(bb *testing.B) {
208+
run(bb, 1, 500*time.Microsecond)
209+
})
210+
b.Run("weight=2,overhead=500μs", func(bb *testing.B) {
211+
run(bb, 2, 500*time.Microsecond)
212+
})
213+
b.Run("weight=4,overhead=500μs", func(bb *testing.B) {
214+
run(bb, 4, 500*time.Microsecond)
215+
})
216+
b.Run("weight=8,overhead=500μs", func(bb *testing.B) {
217+
run(bb, 8, 500*time.Microsecond)
218+
})
190219
}
191220

192221
func TestEnsureSynced(t *testing.T) {

pkg/cache/settings.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,13 @@ func SetListSemaphore(listSemaphore WeightedSemaphore) UpdateSettingsFunc {
102102
}
103103
}
104104

105+
// SetListItemSemaphoreWeight sets the weight to limit the amount of k8s list items to process in parallel.
106+
func SetListItemSemaphoreWeight(listItemSemaphoreWeight int64) UpdateSettingsFunc {
107+
return func(cache *clusterCache) {
108+
cache.listItemSemaphoreWeight = listItemSemaphoreWeight
109+
}
110+
}
111+
105112
// SetResyncTimeout updates cluster re-sync timeout
106113
func SetResyncTimeout(timeout time.Duration) UpdateSettingsFunc {
107114
return func(cache *clusterCache) {

pkg/cache/settings_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,12 @@ func TestSetEventsProcessingInterval(t *testing.T) {
7272
cache.Invalidate(SetEventProcessingInterval(interval))
7373
assert.Equal(t, interval, cache.eventProcessingInterval)
7474
}
75+
76+
func TestSetListItemSemaphoreWeight(t *testing.T) {
77+
cache := NewClusterCache(&rest.Config{})
78+
assert.Equal(t, defaultListItemSemaphoreWeight, cache.listItemSemaphoreWeight)
79+
80+
weight := int64(8)
81+
cache.Invalidate(SetListItemSemaphoreWeight(weight))
82+
assert.Equal(t, weight, cache.listItemSemaphoreWeight)
83+
}

0 commit comments

Comments
 (0)