Skip to content

Commit 0f39af9

Browse files
authored
Merge pull request kubernetes#94364 from wojtek-t/efficient_watch_resumption
Efficient watch resumption
2 parents d666956 + af61e8f commit 0f39af9

File tree

29 files changed

+480
-84
lines changed

29 files changed

+480
-84
lines changed

pkg/controlplane/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
258258
if err != nil {
259259
klog.Fatalf("Error determining service IP ranges: %v", err)
260260
}
261-
leaseStorage, _, err := storagefactory.Create(*config)
261+
leaseStorage, _, err := storagefactory.Create(*config, nil)
262262
if err != nil {
263263
klog.Fatalf("Error creating storage factory: %v", err)
264264
}

pkg/registry/core/pod/rest/log_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func TestPodLogValidates(t *testing.T) {
3131
config, server := registrytest.NewEtcdStorage(t, "")
3232
defer server.Terminate(t)
33-
s, destroyFunc, err := generic.NewRawStorage(config)
33+
s, destroyFunc, err := generic.NewRawStorage(config, nil)
3434
if err != nil {
3535
t.Fatalf("Unexpected error: %v", err)
3636
}

pkg/registry/core/service/allocator/storage/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
6262
// NewEtcd returns an allocator that is backed by Etcd and can manage
6363
// persisting the snapshot state of allocation after each allocation is made.
6464
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) {
65-
storage, d, err := generic.NewRawStorage(config)
65+
storage, d, err := generic.NewRawStorage(config, nil)
6666
if err != nil {
6767
return nil, err
6868
}

pkg/registry/core/service/ipallocator/storage/storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
5454
if err != nil {
5555
t.Fatalf("unexpected error creating etcd: %v", err)
5656
}
57-
s, d, err := generic.NewRawStorage(etcdStorage)
57+
s, d, err := generic.NewRawStorage(etcdStorage, nil)
5858
if err != nil {
5959
t.Fatalf("Couldn't create storage: %v", err)
6060
}

pkg/registry/core/service/portallocator/storage/storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter
5757
if err != nil {
5858
t.Fatalf("unexpected error creating etcd: %v", err)
5959
}
60-
s, d, err := generic.NewRawStorage(etcdStorage)
60+
s, d, err := generic.NewRawStorage(etcdStorage, nil)
6161
if err != nil {
6262
t.Fatalf("Couldn't create storage: %v", err)
6363
}

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ const (
138138
//
139139
// Allows sending warning headers in API responses.
140140
WarningHeaders featuregate.Feature = "WarningHeaders"
141+
142+
// owner: @wojtek-t
143+
// alpha: v1.20
144+
//
145+
// Allows for updating watchcache resource version with progress notify events.
146+
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
141147
)
142148

143149
func init() {
@@ -148,18 +154,19 @@ func init() {
148154
// To add a new feature, define a key for it above and add it here. The features will be
149155
// available throughout Kubernetes binaries.
150156
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
151-
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
152-
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
153-
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
154-
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
155-
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
156-
DryRun: {Default: true, PreRelease: featuregate.GA},
157-
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
158-
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
159-
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
160-
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
161-
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
162-
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
163-
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
164-
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
157+
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
158+
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
159+
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
160+
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
161+
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
162+
DryRun: {Default: true, PreRelease: featuregate.GA},
163+
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
164+
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
165+
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
166+
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
167+
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
168+
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
169+
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
170+
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
171+
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
165172
}

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
3939
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
4040
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
41-
s, destroy, err := factory.Create(*sc)
41+
s, destroy, err := factory.Create(*sc, nil)
4242
if err != nil {
4343
t.Fatalf("Error creating storage: %v", err)
4444
}

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func StorageWithCacher() generic.StorageDecorator {
4545
triggerFuncs storage.IndexerFuncs,
4646
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
4747

48-
s, d, err := generic.NewRawStorage(storageConfig)
48+
s, d, err := generic.NewRawStorage(storageConfig, newFunc)
4949
if err != nil {
5050
return s, d, err
5151
}

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,8 +1601,11 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
16011601
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
16021602
strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true}
16031603

1604+
newFunc := func() runtime.Object { return &example.Pod{} }
1605+
newListFunc := func() runtime.Object { return &example.PodList{} }
1606+
16041607
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
1605-
s, dFunc, err := factory.Create(*sc)
1608+
s, dFunc, err := factory.Create(*sc, newFunc)
16061609
if err != nil {
16071610
t.Fatalf("Error creating storage: %v", err)
16081611
}
@@ -1617,8 +1620,8 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
16171620
ResourcePrefix: podPrefix,
16181621
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
16191622
GetAttrsFunc: getPodAttrs,
1620-
NewFunc: func() runtime.Object { return &example.Pod{} },
1621-
NewListFunc: func() runtime.Object { return &example.PodList{} },
1623+
NewFunc: newFunc,
1624+
NewListFunc: newListFunc,
16221625
Codec: sc.Codec,
16231626
}
16241627
cacher, err := cacherstorage.NewCacherFromConfig(config)

staging/src/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ func UndecoratedStorage(
4747
getAttrsFunc storage.AttrFunc,
4848
trigger storage.IndexerFuncs,
4949
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
50-
return NewRawStorage(config)
50+
return NewRawStorage(config, newFunc)
5151
}
5252

5353
// NewRawStorage creates the low level kv storage. This is a work-around for current
5454
// two layer of same storage interface.
5555
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
56-
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
57-
return factory.Create(*config)
56+
func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
57+
return factory.Create(*config, newFunc)
5858
}

0 commit comments

Comments
 (0)