Skip to content

Commit d9c6c8a

Browse files
committed
cacher: apply key for initial events
For case of SendInitialEvents, a buffer of objects is created. That process takes a significant amount of memory and CPU when the resource is of a large volume. Many objects may be not relevant when key is provided. This commit applies key when composing the buffer for SendInitialEvents. Signed-off-by: Eric Lin <[email protected]>
1 parent f5d62f7 commit d9c6c8a

File tree

7 files changed

+27
-13
lines changed

7 files changed

+27
-13
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
300300
store.Add(elem)
301301
}
302302

303-
wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc)
303+
wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc, "", false)
304304
if err != nil {
305305
t.Fatal(err)
306306
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
622622
defer c.watchCache.RUnlock()
623623

624624
var cacheInterval *watchCacheInterval
625-
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, opts)
625+
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(requiredResourceVersion, key, opts)
626626
if err != nil {
627627
// To match the uncached watch implementation, once we have passed authn/authz/admission,
628628
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -712,9 +712,10 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
712712
// getAllEventsSinceLocked returns a watchCacheInterval that can be used to
713713
// retrieve events since a certain resourceVersion. This function assumes to
714714
// be called under the watchCache lock.
715-
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storage.ListOptions) (*watchCacheInterval, error) {
715+
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, opts storage.ListOptions) (*watchCacheInterval, error) {
716+
_, matchesSingle := opts.Predicate.MatchesSingle()
716717
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
717-
return w.getIntervalFromStoreLocked()
718+
return w.getIntervalFromStoreLocked(key, matchesSingle)
718719
}
719720

720721
size := w.endIndex - w.startIndex
@@ -743,7 +744,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storag
743744
// current state and only then start watching from that point.
744745
//
745746
// TODO: In v2 api, we should stop returning the current state - #13969.
746-
return w.getIntervalFromStoreLocked()
747+
return w.getIntervalFromStoreLocked(key, matchesSingle)
747748
}
748749
// SendInitialEvents = false and resourceVersion = 0
749750
// means that the request would like to start watching
@@ -769,8 +770,8 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, opts storag
769770
// getIntervalFromStoreLocked returns a watchCacheInterval
770771
// that covers the entire storage state.
771772
// This function assumes to be called under the watchCache lock.
772-
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) {
773-
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
773+
func (w *watchCache) getIntervalFromStoreLocked(key string, matchesSingle bool) (*watchCacheInterval, error) {
774+
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc, key, matchesSingle)
774775
if err != nil {
775776
return nil, err
776777
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,22 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
133133
// returned by Next() need to be events from a List() done on the underlying store of
134134
// the watch cache.
135135
// The items returned in the interval will be sorted by Key.
136-
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
136+
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
137137
buffer := &watchCacheIntervalBuffer{}
138-
allItems := store.List()
138+
var allItems []interface{}
139+
140+
if matchesSingle {
141+
item, exists, err := store.GetByKey(key)
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
if exists {
147+
allItems = append(allItems, item)
148+
}
149+
} else {
150+
allItems = store.List()
151+
}
139152
buffer.buffer = make([]*watchCacheEvent, len(allItems))
140153
for i, item := range allItems {
141154
elem, ok := item.(*storeElement)

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func TestCacheIntervalNextFromStore(t *testing.T) {
391391
store.Add(elem)
392392
}
393393

394-
wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc)
394+
wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc, "", false)
395395
if err != nil {
396396
t.Fatal(err)
397397
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts
105105
w.RLock()
106106
defer w.RUnlock()
107107

108-
return w.getAllEventsSinceLocked(resourceVersion, opts)
108+
return w.getAllEventsSinceLocked(resourceVersion, "", opts)
109109
}
110110

111111
// newTestWatchCache just adds a fake clock.

staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set)
118118
// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's
119119
// namespace.
120120
func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
121-
if len(s.Continue) > 0 {
121+
if len(s.Continue) > 0 || s.Field == nil {
122122
return "", false
123123
}
124124
if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok {
@@ -130,7 +130,7 @@ func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
130130
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
131131
// name.
132132
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
133-
if len(s.Continue) > 0 {
133+
if len(s.Continue) > 0 || s.Field == nil {
134134
return "", false
135135
}
136136
// TODO: should be namespace.name

0 commit comments

Comments
 (0)