Skip to content

Commit a94fb53

Browse files
committed
Enable progress notify events in watchcache
1 parent 4af1328 commit a94fb53

File tree

3 files changed

+52
-15
lines changed

3 files changed

+52
-15
lines changed

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ const (
138138
//
139139
// Allows sending warning headers in API responses.
140140
WarningHeaders featuregate.Feature = "WarningHeaders"
141+
142+
// owner: @wojtek-t
143+
// alpha: v1.20
144+
//
145+
// Allows for updating watchcache resource version with progress notify events.
146+
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
141147
)
142148

143149
func init() {
@@ -148,18 +154,19 @@ func init() {
148154
// To add a new feature, define a key for it above and add it here. The features will be
149155
// available throughout Kubernetes binaries.
150156
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
151-
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
152-
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
153-
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
154-
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
155-
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
156-
DryRun: {Default: true, PreRelease: featuregate.GA},
157-
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
158-
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
159-
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
160-
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
161-
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
162-
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
163-
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
164-
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
157+
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
158+
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
159+
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
160+
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
161+
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
162+
DryRun: {Default: true, PreRelease: featuregate.GA},
163+
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
164+
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
165+
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
166+
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
167+
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
168+
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
169+
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
170+
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
171+
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
165172
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,14 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
10981098

10991099
// Implements cache.ListerWatcher interface.
11001100
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
1101-
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
1101+
opts := storage.ListOptions{
1102+
ResourceVersion: options.ResourceVersion,
1103+
Predicate: storage.Everything,
1104+
}
1105+
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
1106+
opts.ProgressNotify = true
1107+
}
1108+
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
11021109
}
11031110

11041111
// errWatcher implements watch.Interface to return a single error

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,29 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
381381
w.capacity = capacity
382382
}
383383

384+
func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
385+
rv, err := w.versioner.ParseResourceVersion(resourceVersion)
386+
if err != nil {
387+
klog.Errorf("Couldn't parse resourceVersion: %v", err)
388+
return
389+
}
390+
391+
w.Lock()
392+
defer w.Unlock()
393+
w.resourceVersion = rv
394+
395+
// Don't dispatch bookmarks coming from the storage layer.
396+
// They can be very frequent (even to the level of subseconds)
397+
// to allow efficient watch resumption on kube-apiserver restarts,
398+
// and propagating them down may overload the whole system.
399+
//
400+
// TODO: If at some point we decide the performance and scalability
401+
// footprint is acceptable, this is the place to hook them in.
402+
// However, we then need to check if this was called as a result
403+
// of a bookmark event or regular Add/Update/Delete operation by
404+
// checking if resourceVersion here has changed.
405+
}
406+
384407
// List returns list of pointers to <storeElement> objects.
385408
func (w *watchCache) List() []interface{} {
386409
return w.store.List()

0 commit comments

Comments
 (0)