Skip to content

Commit 107be8f

Browse files
authored
Merge pull request kubernetes#129205 from tosi3k/wc-configurable-retention
Configure watch cache history window based on request timeout
2 parents bb24538 + 4a2b7ee commit 107be8f

File tree

13 files changed

+279
-198
lines changed

13 files changed

+279
-198
lines changed

cmd/kube-apiserver/app/options/options_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ func TestAddFlags(t *testing.T) {
175175
CompactionInterval: storagebackend.DefaultCompactInterval,
176176
CountMetricPollPeriod: time.Minute,
177177
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
178+
EventsHistoryWindow: storagebackend.DefaultEventsHistoryWindow,
178179
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
179180
ReadycheckTimeout: storagebackend.DefaultReadinessTimeout,
180181
LeaseManagerConfig: etcd3.LeaseManagerConfig{

pkg/controlplane/apiserver/options/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,16 @@ func (o *Options) Complete(ctx context.Context, fss cliflag.NamedFlagSets, alter
225225
return CompletedOptions{}, fmt.Errorf("error creating self-signed certificates: %v", err)
226226
}
227227

228+
if o.GenericServerRunOptions.RequestTimeout > 0 {
229+
// Setting the EventsHistoryWindow as a maximum of the value set in the
230+
// watchcache-specific options and the value of the request timeout plus
231+
// some epsilon.
232+
// This is done to make sure that the list+watch pattern can still be
233+
// usable in large clusters with the elevated request timeout where the
234+
// initial list can take a considerable amount of time.
235+
completed.Etcd.StorageConfig.EventsHistoryWindow = max(completed.Etcd.StorageConfig.EventsHistoryWindow, completed.GenericServerRunOptions.RequestTimeout+15*time.Second)
236+
}
237+
228238
if len(completed.GenericServerRunOptions.ExternalHost) == 0 {
229239
if len(completed.GenericServerRunOptions.AdvertiseAddress) > 0 {
230240
completed.GenericServerRunOptions.ExternalHost = completed.GenericServerRunOptions.AdvertiseAddress.String()

pkg/controlplane/apiserver/options/options_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func TestAddFlags(t *testing.T) {
167167
CompactionInterval: storagebackend.DefaultCompactInterval,
168168
CountMetricPollPeriod: time.Minute,
169169
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
170+
EventsHistoryWindow: storagebackend.DefaultEventsHistoryWindow,
170171
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
171172
ReadycheckTimeout: storagebackend.DefaultReadinessTimeout,
172173
LeaseManagerConfig: etcd3.LeaseManagerConfig{

staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import (
5858
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
5959
"k8s.io/apiserver/pkg/registry/rest"
6060
"k8s.io/apiserver/pkg/server/options"
61+
"k8s.io/apiserver/pkg/storage/cacher"
6162
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
6263
"k8s.io/apiserver/pkg/util/webhook"
6364
"k8s.io/client-go/tools/cache"
@@ -484,6 +485,10 @@ func testHandlerConversion(t *testing.T, enableWatchCache bool) {
484485
t.Fatal(err)
485486
}
486487

488+
if enableWatchCache {
489+
storageConfig.EventsHistoryWindow = cacher.DefaultEventFreshDuration
490+
}
491+
487492
etcdOptions := options.NewEtcdOptions(storageConfig)
488493
etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
489494
restOptionsGetter := generic.RESTOptions{

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,18 @@ func StorageWithCacher() generic.StorageDecorator {
5454
}
5555

5656
cacherConfig := cacherstorage.Config{
57-
Storage: s,
58-
Versioner: storage.APIObjectVersioner{},
59-
GroupResource: storageConfig.GroupResource,
60-
ResourcePrefix: resourcePrefix,
61-
KeyFunc: keyFunc,
62-
NewFunc: newFunc,
63-
NewListFunc: newListFunc,
64-
GetAttrsFunc: getAttrsFunc,
65-
IndexerFuncs: triggerFuncs,
66-
Indexers: indexers,
67-
Codec: storageConfig.Codec,
57+
Storage: s,
58+
Versioner: storage.APIObjectVersioner{},
59+
GroupResource: storageConfig.GroupResource,
60+
EventsHistoryWindow: storageConfig.EventsHistoryWindow,
61+
ResourcePrefix: resourcePrefix,
62+
KeyFunc: keyFunc,
63+
NewFunc: newFunc,
64+
NewListFunc: newListFunc,
65+
GetAttrsFunc: getAttrsFunc,
66+
IndexerFuncs: triggerFuncs,
67+
Indexers: indexers,
68+
Codec: storageConfig.Codec,
6869
}
6970
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
7071
if err != nil {

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2435,15 +2435,16 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
24352435
}
24362436
if hasCacheEnabled {
24372437
config := cacherstorage.Config{
2438-
Storage: s,
2439-
Versioner: storage.APIObjectVersioner{},
2440-
GroupResource: schema.GroupResource{Resource: "pods"},
2441-
ResourcePrefix: podPrefix,
2442-
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
2443-
GetAttrsFunc: getPodAttrs,
2444-
NewFunc: newFunc,
2445-
NewListFunc: newListFunc,
2446-
Codec: sc.Codec,
2438+
Storage: s,
2439+
Versioner: storage.APIObjectVersioner{},
2440+
GroupResource: schema.GroupResource{Resource: "pods"},
2441+
EventsHistoryWindow: cacherstorage.DefaultEventFreshDuration,
2442+
ResourcePrefix: podPrefix,
2443+
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
2444+
GetAttrsFunc: getPodAttrs,
2445+
NewFunc: newFunc,
2446+
NewListFunc: newListFunc,
2447+
Codec: sc.Codec,
24472448
}
24482449
cacher, err := cacherstorage.NewCacherFromConfig(config)
24492450
if err != nil {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,16 @@ const (
6161
// storageWatchListPageSize is the cacher's request chunk size of
6262
// initial and resync watch lists to storage.
6363
storageWatchListPageSize = int64(10000)
64+
65+
// DefaultEventFreshDuration is the default time duration of events
66+
// we want to keep.
67+
// We set it to defaultBookmarkFrequency plus epsilon to maximize
68+
// chances that last bookmark was sent within kept history, at the
69+
// same time, minimizing the needed memory usage.
70+
DefaultEventFreshDuration = defaultBookmarkFrequency + 15*time.Second
71+
6472
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
6573
// in addition to sending a bookmark right before watch deadline.
66-
//
67-
// NOTE: Update `eventFreshDuration` when changing this value.
6874
defaultBookmarkFrequency = time.Minute
6975
)
7076

@@ -80,6 +86,10 @@ type Config struct {
8086
// and metrics.
8187
GroupResource schema.GroupResource
8288

89+
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
90+
// If lower than DefaultEventFreshDuration, the cache creation will fail.
91+
EventsHistoryWindow time.Duration
92+
8393
// The Cache will be caching objects of a given Type and assumes that they
8494
// are all stored under ResourcePrefix directory in the underlying database.
8595
ResourcePrefix string
@@ -409,9 +419,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
409419
contextMetadata = metadata.New(map[string]string{"source": "cache"})
410420
}
411421

422+
eventFreshDuration := config.EventsHistoryWindow
423+
if eventFreshDuration < DefaultEventFreshDuration {
424+
return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration)
425+
}
426+
412427
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
413428
watchCache := newWatchCache(
414-
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
429+
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers,
430+
config.Clock, eventFreshDuration, config.GroupResource, progressRequester)
415431
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
416432
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
417433

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -470,18 +470,19 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
470470
}
471471

472472
config := Config{
473-
Storage: wrappedStorage,
474-
Versioner: storage.APIObjectVersioner{},
475-
GroupResource: schema.GroupResource{Resource: "pods"},
476-
ResourcePrefix: setupOpts.resourcePrefix,
477-
KeyFunc: setupOpts.keyFunc,
478-
GetAttrsFunc: GetPodAttrs,
479-
NewFunc: newPod,
480-
NewListFunc: newPodList,
481-
IndexerFuncs: setupOpts.indexerFuncs,
482-
Indexers: &setupOpts.indexers,
483-
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
484-
Clock: setupOpts.clock,
473+
Storage: wrappedStorage,
474+
Versioner: storage.APIObjectVersioner{},
475+
GroupResource: schema.GroupResource{Resource: "pods"},
476+
EventsHistoryWindow: DefaultEventFreshDuration,
477+
ResourcePrefix: setupOpts.resourcePrefix,
478+
KeyFunc: setupOpts.keyFunc,
479+
GetAttrsFunc: GetPodAttrs,
480+
NewFunc: newPod,
481+
NewListFunc: newPodList,
482+
IndexerFuncs: setupOpts.indexerFuncs,
483+
Indexers: &setupOpts.indexers,
484+
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
485+
Clock: setupOpts.clock,
485486
}
486487
cacher, err := NewCacherFromConfig(config)
487488
if err != nil {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ import (
6262
func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) {
6363
prefix := "pods"
6464
config := Config{
65-
Storage: s,
66-
Versioner: storage.APIObjectVersioner{},
67-
GroupResource: schema.GroupResource{Resource: "pods"},
68-
ResourcePrefix: prefix,
69-
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
65+
Storage: s,
66+
Versioner: storage.APIObjectVersioner{},
67+
GroupResource: schema.GroupResource{Resource: "pods"},
68+
EventsHistoryWindow: DefaultEventFreshDuration,
69+
ResourcePrefix: prefix,
70+
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
7071
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) {
7172
pod, ok := obj.(*example.Pod)
7273
if !ok {
@@ -2722,17 +2723,18 @@ func TestWatchStreamSeparation(t *testing.T) {
27222723
setupOpts := &setupOptions{}
27232724
withDefaults(setupOpts)
27242725
config := Config{
2725-
Storage: etcdStorage,
2726-
Versioner: storage.APIObjectVersioner{},
2727-
GroupResource: schema.GroupResource{Resource: "pods"},
2728-
ResourcePrefix: setupOpts.resourcePrefix,
2729-
KeyFunc: setupOpts.keyFunc,
2730-
GetAttrsFunc: GetPodAttrs,
2731-
NewFunc: newPod,
2732-
NewListFunc: newPodList,
2733-
IndexerFuncs: setupOpts.indexerFuncs,
2734-
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
2735-
Clock: setupOpts.clock,
2726+
Storage: etcdStorage,
2727+
Versioner: storage.APIObjectVersioner{},
2728+
GroupResource: schema.GroupResource{Resource: "pods"},
2729+
EventsHistoryWindow: DefaultEventFreshDuration,
2730+
ResourcePrefix: setupOpts.resourcePrefix,
2731+
KeyFunc: setupOpts.keyFunc,
2732+
GetAttrsFunc: GetPodAttrs,
2733+
NewFunc: newPod,
2734+
NewListFunc: newPodList,
2735+
IndexerFuncs: setupOpts.indexerFuncs,
2736+
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
2737+
Clock: setupOpts.clock,
27362738
}
27372739
tcs := []struct {
27382740
name string

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,11 @@ const (
5252
// after receiving a 'too high resource version' error.
5353
resourceVersionTooHighRetrySeconds = 1
5454

55-
// eventFreshDuration is time duration of events we want to keep.
56-
// We set it to `defaultBookmarkFrequency` plus epsilon to maximize
57-
// chances that last bookmark was sent within kept history, at the
58-
// same time, minimizing the needed memory usage.
59-
eventFreshDuration = 75 * time.Second
60-
6155
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
6256
// TODO: Figure out, to what value we can decreased it.
6357
defaultLowerBoundCapacity = 100
6458

65-
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
59+
// defaultUpperBoundCapacity should be able to keep the required history.
6660
defaultUpperBoundCapacity = 100 * 1024
6761
)
6862

@@ -142,6 +136,9 @@ type watchCache struct {
142136
// for testing timeouts.
143137
clock clock.Clock
144138

139+
// eventFreshDuration defines the minimum watch history watchcache will store.
140+
eventFreshDuration time.Duration
141+
145142
// An underlying storage.Versioner.
146143
versioner storage.Versioner
147144

@@ -163,6 +160,7 @@ func newWatchCache(
163160
versioner storage.Versioner,
164161
indexers *cache.Indexers,
165162
clock clock.WithTicker,
163+
eventFreshDuration time.Duration,
166164
groupResource schema.GroupResource,
167165
progressRequester *conditionalProgressRequester) *watchCache {
168166
wc := &watchCache{
@@ -179,6 +177,7 @@ func newWatchCache(
179177
listResourceVersion: 0,
180178
eventHandler: eventHandler,
181179
clock: clock,
180+
eventFreshDuration: eventFreshDuration,
182181
versioner: versioner,
183182
groupResource: groupResource,
184183
waitingUntilFresh: progressRequester,
@@ -319,14 +318,14 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
319318
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
320319
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
321320
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
322-
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
321+
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < w.eventFreshDuration {
323322
capacity := min(w.capacity*2, w.upperBoundCapacity)
324323
if capacity > w.capacity {
325324
w.doCacheResizeLocked(capacity)
326325
}
327326
return
328327
}
329-
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
328+
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > w.eventFreshDuration {
330329
capacity := max(w.capacity/2, w.lowerBoundCapacity)
331330
if capacity < w.capacity {
332331
w.doCacheResizeLocked(capacity)
@@ -660,7 +659,7 @@ func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) in
660659
// We don't have an exact data, but given we store updates from
661660
// the last <eventFreshDuration>, we approach it by dividing the
662661
// capacity by the length of the history window.
663-
chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds()))
662+
chanSize := int(math.Ceil(float64(w.currentCapacity()) / w.eventFreshDuration.Seconds()))
664663

665664
// Finally we adjust the size to avoid ending with too low or
666665
// to large values.

0 commit comments

Comments
 (0)