Skip to content

Commit 2de2093

Browse files
committed
Add snapshotting of watch cache behind a feature gate
1 parent 9988145 commit 2de2093

File tree

8 files changed

+329
-1
lines changed

8 files changed

+329
-1
lines changed

pkg/features/versioned_kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
295295
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Deprecated},
296296
},
297297

298+
genericfeatures.ListFromCacheSnapshot: {
299+
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
300+
},
301+
298302
genericfeatures.MutatingAdmissionPolicy: {
299303
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
300304
},

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ const (
123123
// Enables KMS v1 API for encryption at rest.
124124
KMSv1 featuregate.Feature = "KMSv1"
125125

126+
// owner: @serathius
127+
// kep: https://kep.k8s.io/4988
128+
//
129+
// Enables generating snapshots of watch cache store and using them to serve LIST requests.
130+
ListFromCacheSnapshot featuregate.Feature = "ListFromCacheSnapshot"
131+
126132
// owner: @alexzielenski, @cici37, @jiahuif, @jpbetz
127133
// kep: https://kep.k8s.io/3962
128134
//
@@ -334,6 +340,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
334340
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Deprecated},
335341
},
336342

343+
ListFromCacheSnapshot: {
344+
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
345+
},
346+
337347
MutatingAdmissionPolicy: {
338348
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
339349
},

staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ type storeIndexer interface {
7474

7575
type orderedLister interface {
7676
ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool)
77+
Count(prefix, continueKey string) (count int)
78+
Clone() orderedLister
7779
}
7880

7981
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {

staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ type threadedStoreIndexer struct {
4646

4747
var _ orderedLister = (*threadedStoreIndexer)(nil)
4848

49+
func (si *threadedStoreIndexer) Count(prefix, continueKey string) (count int) {
50+
si.lock.RLock()
51+
defer si.lock.RUnlock()
52+
return si.store.Count(prefix, continueKey)
53+
}
54+
55+
func (si *threadedStoreIndexer) Clone() orderedLister {
56+
si.lock.RLock()
57+
defer si.lock.RUnlock()
58+
return si.store.Clone()
59+
}
60+
4961
func (si *threadedStoreIndexer) Add(obj interface{}) error {
5062
return si.addOrUpdate(obj)
5163
}
@@ -140,6 +152,12 @@ type btreeStore struct {
140152
tree *btree.BTreeG[*storeElement]
141153
}
142154

155+
func (s *btreeStore) Clone() orderedLister {
156+
return &btreeStore{
157+
tree: s.tree.Clone(),
158+
}
159+
}
160+
143161
func (s *btreeStore) Add(obj interface{}) error {
144162
if obj == nil {
145163
return fmt.Errorf("obj cannot be nil")
@@ -387,3 +405,97 @@ func (i *indexer) delete(key, value string, index map[string]map[string]*storeEl
387405
delete(index, value)
388406
}
389407
}
408+
409+
// newStoreSnapshotter returns a storeSnapshotter that stores snapshots for
410+
// serving read requests with exact resource versions (RV) and pagination.
411+
//
412+
// Snapshots are created by calling Clone method on orderedLister, which is
413+
// expected to be fast and efficient thanks to usage of B-trees.
414+
// B-trees can create a lazy copy of the tree structure, minimizing overhead.
415+
//
416+
// Assuming the watch cache observes all events and snapshots cache after each of them,
417+
// requests for a specific resource version can be served by retrieving
418+
// the snapshot with the greatest RV less than or equal to the requested RV.
419+
// To make snapshot retrivial efficient we need an ordered data structure, such as tree.
420+
//
421+
// The initial implementation uses a B-tree to achieve the following performance characteristics (n - number of snapshots stored):
422+
// - `Add`: Adds a new snapshot.
423+
// Complexity: O(log n).
424+
// Executed for each watch event observed by the cache.
425+
// - `GetLessOrEqual`: Retrieves the snapshot with the greatest RV less than or equal to the requested RV.
426+
// Complexity: O(log n).
427+
// Executed for each LIST request with match=Exact or continuation.
428+
// - `RemoveLess`: Cleans up snapshots outside the watch history window.
429+
// Complexity: O(k log n), k - number of snapshots to remove, usually only one if watch capacity was not reduced.
430+
// Executed per watch event observed when the cache is full.
431+
// - `Reset`: Cleans up all snapshots.
432+
// Complexity: O(1).
433+
// Executed when the watch cache is reinitialized.
434+
//
435+
// Further optimization is possible by leveraging the property that adds always
436+
// increase the maximum RV and deletes only increase the minimum RV.
437+
// For example, a binary search on a cyclic buffer of (RV, snapshot)
438+
// should reduce number of allocations and improve removal complexity.
439+
// However, this solution is more complex and is deferred for future implementation.
440+
//
441+
// TODO: Rewrite to use a cyclic buffer
442+
func newStoreSnapshotter() *storeSnapshotter {
443+
s := &storeSnapshotter{
444+
snapshots: btree.NewG[rvSnapshot](btreeDegree, func(a, b rvSnapshot) bool {
445+
return a.resourceVersion < b.resourceVersion
446+
}),
447+
}
448+
return s
449+
}
450+
451+
type storeSnapshotter struct {
452+
mux sync.RWMutex
453+
snapshots *btree.BTreeG[rvSnapshot]
454+
}
455+
456+
type rvSnapshot struct {
457+
resourceVersion uint64
458+
snapshot orderedLister
459+
}
460+
461+
func (s *storeSnapshotter) Reset() {
462+
s.mux.Lock()
463+
defer s.mux.Unlock()
464+
s.snapshots.Clear(false)
465+
}
466+
467+
func (s *storeSnapshotter) GetLessOrEqual(rv uint64) (orderedLister, bool) {
468+
s.mux.RLock()
469+
defer s.mux.RUnlock()
470+
471+
var result *rvSnapshot
472+
s.snapshots.DescendLessOrEqual(rvSnapshot{resourceVersion: rv}, func(rvs rvSnapshot) bool {
473+
result = &rvs
474+
return false
475+
})
476+
if result == nil {
477+
return nil, false
478+
}
479+
return result.snapshot, true
480+
}
481+
482+
func (s *storeSnapshotter) Add(rv uint64, indexer orderedLister) {
483+
s.mux.Lock()
484+
defer s.mux.Unlock()
485+
s.snapshots.ReplaceOrInsert(rvSnapshot{resourceVersion: rv, snapshot: indexer.Clone()})
486+
}
487+
488+
func (s *storeSnapshotter) RemoveLess(rv uint64) {
489+
s.mux.Lock()
490+
defer s.mux.Unlock()
491+
for s.snapshots.Len() > 0 {
492+
oldest, ok := s.snapshots.Min()
493+
if !ok {
494+
break
495+
}
496+
if rv <= oldest.resourceVersion {
497+
break
498+
}
499+
s.snapshots.DeleteMin()
500+
}
501+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,71 @@ func TestStoreListPrefix(t *testing.T) {
7979
testStorageElement("bar", "baz", 4),
8080
}, items)
8181
}
82+
83+
func TestStoreSnapshotter(t *testing.T) {
84+
cache := newStoreSnapshotter()
85+
cache.Add(10, fakeOrderedLister{rv: 10})
86+
cache.Add(20, fakeOrderedLister{rv: 20})
87+
cache.Add(30, fakeOrderedLister{rv: 30})
88+
cache.Add(40, fakeOrderedLister{rv: 40})
89+
assert.Equal(t, 4, cache.snapshots.Len())
90+
91+
t.Log("No snapshot from before first RV")
92+
_, found := cache.GetLessOrEqual(9)
93+
assert.False(t, found)
94+
95+
t.Log("Get snapshot from first RV")
96+
snapshot, found := cache.GetLessOrEqual(10)
97+
assert.True(t, found)
98+
assert.Equal(t, 10, snapshot.(fakeOrderedLister).rv)
99+
100+
t.Log("Get first snapshot by larger RV")
101+
snapshot, found = cache.GetLessOrEqual(11)
102+
assert.True(t, found)
103+
assert.Equal(t, 10, snapshot.(fakeOrderedLister).rv)
104+
105+
t.Log("Get second snapshot by larger RV")
106+
snapshot, found = cache.GetLessOrEqual(22)
107+
assert.True(t, found)
108+
assert.Equal(t, 20, snapshot.(fakeOrderedLister).rv)
109+
110+
t.Log("Get third snapshot for future revision")
111+
snapshot, found = cache.GetLessOrEqual(100)
112+
assert.True(t, found)
113+
assert.Equal(t, 40, snapshot.(fakeOrderedLister).rv)
114+
115+
t.Log("Remove snapshot less than 30")
116+
cache.RemoveLess(30)
117+
118+
assert.Equal(t, 2, cache.snapshots.Len())
119+
_, found = cache.GetLessOrEqual(10)
120+
assert.False(t, found)
121+
122+
_, found = cache.GetLessOrEqual(20)
123+
assert.False(t, found)
124+
125+
snapshot, found = cache.GetLessOrEqual(30)
126+
assert.True(t, found)
127+
assert.Equal(t, 30, snapshot.(fakeOrderedLister).rv)
128+
129+
t.Log("Remove removing all RVs")
130+
cache.Reset()
131+
assert.Equal(t, 0, cache.snapshots.Len())
132+
_, found = cache.GetLessOrEqual(30)
133+
assert.False(t, found)
134+
_, found = cache.GetLessOrEqual(40)
135+
assert.False(t, found)
136+
}
137+
138+
type fakeOrderedLister struct {
139+
rv int
140+
}
141+
142+
func (f fakeOrderedLister) Add(obj interface{}) error { return nil }
143+
func (f fakeOrderedLister) Update(obj interface{}) error { return nil }
144+
func (f fakeOrderedLister) Delete(obj interface{}) error { return nil }
145+
func (f fakeOrderedLister) Clone() orderedLister { return f }
146+
func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string, limit int) ([]interface{}, bool) {
147+
return nil, false
148+
}
149+
func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 }

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ type watchCache struct {
151151
// Requests progress notification if there are requests waiting for watch
152152
// to be fresh
153153
waitingUntilFresh *conditionalProgressRequester
154+
155+
// Stores previous snapshots of orderedLister to allow serving requests from previous revisions.
156+
snapshots *storeSnapshotter
154157
}
155158

156159
func newWatchCache(
@@ -182,6 +185,9 @@ func newWatchCache(
182185
groupResource: groupResource,
183186
waitingUntilFresh: progressRequester,
184187
}
188+
if utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
189+
wc.snapshots = newStoreSnapshotter()
190+
}
185191
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
186192
wc.cond = sync.NewCond(wc.RLocker())
187193
wc.indexValidator = wc.isIndexValidLocked
@@ -286,7 +292,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
286292
w.resourceVersion = resourceVersion
287293
defer w.cond.Broadcast()
288294

289-
return updateFunc(elem)
295+
err := updateFunc(elem)
296+
if err != nil {
297+
return err
298+
}
299+
if w.snapshots != nil {
300+
if orderedLister, ordered := w.store.(orderedLister); ordered {
301+
if w.isCacheFullLocked() {
302+
oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion
303+
w.snapshots.RemoveLess(oldestRV)
304+
}
305+
w.snapshots.Add(w.resourceVersion, orderedLister)
306+
}
307+
}
308+
return err
290309
}(); err != nil {
291310
return err
292311
}
@@ -601,6 +620,12 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
601620
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
602621
return err
603622
}
623+
if w.snapshots != nil {
624+
w.snapshots.Reset()
625+
if orderedLister, ordered := w.store.(orderedLister); ordered {
626+
w.snapshots.Add(version, orderedLister)
627+
}
628+
}
604629
w.listResourceVersion = version
605630
w.resourceVersion = version
606631
if w.onReplace != nil {

0 commit comments

Comments
 (0)