Skip to content

Commit 56407b6

Browse files
committed
Use dynamic size watch-cache.
If all cached events occur inside eventFreshDuration, increase cache capacity by 2x. Decrease cache capacity by 2x when recent half events occur outside eventFreshDuration.
1 parent 8fa7931 commit 56407b6

File tree

6 files changed

+426
-14
lines changed

6 files changed

+426
-14
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,11 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
315315
}
316316

317317
clock := clock.RealClock{}
318+
objType := reflect.TypeOf(obj)
318319
cacher := &Cacher{
319320
ready: newReady(),
320321
storage: config.Storage,
321-
objectType: reflect.TypeOf(obj),
322+
objectType: objType,
322323
versioner: config.Versioner,
323324
newFunc: config.NewFunc,
324325
indexedTrigger: indexedTrigger,
@@ -349,7 +350,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
349350
}
350351

351352
watchCache := newWatchCache(
352-
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers)
353+
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType)
353354
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
354355
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
355356

staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2019 The Kubernetes Authors.
2+
Copyright 2020 The Kubernetes Authors.
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -33,7 +33,25 @@ var (
3333
initCounter = metrics.NewCounterVec(
3434
&metrics.CounterOpts{
3535
Name: "apiserver_init_events_total",
36-
Help: "Counter of init events processed in watchcache broken by resource type",
36+
Help: "Counter of init events processed in watchcache broken by resource type.",
37+
StabilityLevel: metrics.ALPHA,
38+
},
39+
[]string{"resource"},
40+
)
41+
42+
watchCacheCapacityIncreaseTotal = metrics.NewCounterVec(
43+
&metrics.CounterOpts{
44+
Name: "watch_cache_capacity_increase_total",
45+
Help: "Total number of watch cache capacity increase events broken by resource type.",
46+
StabilityLevel: metrics.ALPHA,
47+
},
48+
[]string{"resource"},
49+
)
50+
51+
watchCacheCapacityDecreaseTotal = metrics.NewCounterVec(
52+
&metrics.CounterOpts{
53+
Name: "watch_cache_capacity_decrease_total",
54+
Help: "Total number of watch cache capacity decrease events broken by resource type.",
3755
StabilityLevel: metrics.ALPHA,
3856
},
3957
[]string{"resource"},
@@ -42,4 +60,15 @@ var (
4260

4361
func init() {
4462
legacyregistry.MustRegister(initCounter)
63+
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
64+
legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal)
65+
}
66+
67+
// recordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations.
68+
func recordsWatchCacheCapacityChange(objType string, old, new int) {
69+
if old < new {
70+
watchCacheCapacityIncreaseTotal.WithLabelValues(objType).Inc()
71+
return
72+
}
73+
watchCacheCapacityDecreaseTotal.WithLabelValues(objType).Inc()
4574
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,17 @@ func hasPathPrefix(s, pathPrefix string) bool {
4444
}
4545
return false
4646
}
47+
48+
func max(a, b int) int {
49+
if a > b {
50+
return a
51+
}
52+
return b
53+
}
54+
55+
func min(a, b int) int {
56+
if a < b {
57+
return a
58+
}
59+
return b
60+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cacher
1818

1919
import (
2020
"fmt"
21+
"reflect"
2122
"sort"
2223
"sync"
2324
"time"
@@ -44,6 +45,20 @@ const (
4445
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
4546
// after receiving a 'too high resource version' error.
4647
resourceVersionTooHighRetrySeconds = 1
48+
49+
// eventFreshDuration is time duration of events we want to keep.
50+
eventFreshDuration = 5 * time.Minute
51+
52+
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
53+
// 100 is minimum in NewHeuristicWatchCacheSizes.
54+
// TODO: Figure out, to what value we can decreased it.
55+
defaultLowerBoundCapacity = 100
56+
57+
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
58+
// With the current 102400 value though, it's not enough for leases in 5k-node cluster,
59+
// but that is conscious decision.
60+
// TODO: Validate if the current value is high enough for large scale clusters.
61+
defaultUpperBoundCapacity = 100 * 1024
4762
)
4863

4964
// watchCacheEvent is a single "watch event" that is send to users of
@@ -60,6 +75,7 @@ type watchCacheEvent struct {
6075
PrevObjFields fields.Set
6176
Key string
6277
ResourceVersion uint64
78+
RecordTime time.Time
6379
}
6480

6581
// Computing a key of an object is generally non-trivial (it performs
@@ -126,6 +142,12 @@ type watchCache struct {
126142
// Maximum size of history window.
127143
capacity int
128144

145+
// upper bound of capacity since event cache has a dynamic size.
146+
upperBoundCapacity int
147+
148+
// lower bound of capacity since event cache has a dynamic size.
149+
lowerBoundCapacity int
150+
129151
// keyFunc is used to get a key in the underlying storage for a given object.
130152
keyFunc func(runtime.Object) (string, error)
131153

@@ -165,6 +187,9 @@ type watchCache struct {
165187

166188
// An underlying storage.Versioner.
167189
versioner storage.Versioner
190+
191+
// cacher's objectType.
192+
objectType reflect.Type
168193
}
169194

170195
func newWatchCache(
@@ -173,12 +198,16 @@ func newWatchCache(
173198
eventHandler func(*watchCacheEvent),
174199
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
175200
versioner storage.Versioner,
176-
indexers *cache.Indexers) *watchCache {
201+
indexers *cache.Indexers,
202+
objectType reflect.Type) *watchCache {
177203
wc := &watchCache{
178-
capacity: capacity,
179-
keyFunc: keyFunc,
180-
getAttrsFunc: getAttrsFunc,
181-
cache: make([]*watchCacheEvent, capacity),
204+
capacity: capacity,
205+
keyFunc: keyFunc,
206+
getAttrsFunc: getAttrsFunc,
207+
cache: make([]*watchCacheEvent, capacity),
208+
// TODO get rid of them once we stop passing capacity as a parameter to watch cache.
209+
lowerBoundCapacity: min(capacity, defaultLowerBoundCapacity),
210+
upperBoundCapacity: max(capacity, defaultUpperBoundCapacity),
182211
startIndex: 0,
183212
endIndex: 0,
184213
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
@@ -187,6 +216,7 @@ func newWatchCache(
187216
eventHandler: eventHandler,
188217
clock: clock.RealClock{},
189218
versioner: versioner,
219+
objectType: objectType,
190220
}
191221
wc.cond = sync.NewCond(wc.RLocker())
192222
return wc
@@ -260,6 +290,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
260290
ObjFields: elem.Fields,
261291
Key: key,
262292
ResourceVersion: resourceVersion,
293+
RecordTime: w.clock.Now(),
263294
}
264295

265296
if err := func() error {
@@ -301,14 +332,57 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
301332

302333
// Assumes that lock is already held for write.
303334
func (w *watchCache) updateCache(event *watchCacheEvent) {
304-
if w.endIndex == w.startIndex+w.capacity {
335+
w.resizeCacheLocked(event.RecordTime)
336+
if w.isCacheFullLocked() {
305337
// Cache is full - remove the oldest element.
306338
w.startIndex++
307339
}
308340
w.cache[w.endIndex%w.capacity] = event
309341
w.endIndex++
310342
}
311343

344+
// resizeCacheLocked resizes the cache if necessary:
345+
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
346+
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
347+
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
348+
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
349+
capacity := min(w.capacity*2, w.upperBoundCapacity)
350+
if capacity > w.capacity {
351+
w.doCacheResizeLocked(capacity)
352+
}
353+
return
354+
}
355+
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
356+
capacity := max(w.capacity/2, w.lowerBoundCapacity)
357+
if capacity < w.capacity {
358+
w.doCacheResizeLocked(capacity)
359+
}
360+
return
361+
}
362+
}
363+
364+
// isCacheFullLocked used to judge whether watchCacheEvent is full.
365+
// Assumes that lock is already held for write.
366+
func (w *watchCache) isCacheFullLocked() bool {
367+
return w.endIndex == w.startIndex+w.capacity
368+
}
369+
370+
// doCacheResizeLocked resize watchCache's event array with different capacity.
371+
// Assumes that lock is already held for write.
372+
func (w *watchCache) doCacheResizeLocked(capacity int) {
373+
newCache := make([]*watchCacheEvent, capacity)
374+
if capacity < w.capacity {
375+
// adjust startIndex if cache capacity shrink.
376+
w.startIndex = w.endIndex - capacity
377+
}
378+
for i := w.startIndex; i < w.endIndex; i++ {
379+
newCache[i%capacity] = w.cache[i%w.capacity]
380+
}
381+
w.cache = newCache
382+
recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
383+
w.capacity = capacity
384+
}
385+
312386
// List returns list of pointers to <storeElement> objects.
313387
func (w *watchCache) List() []interface{} {
314388
return w.store.List()

0 commit comments

Comments
 (0)