Skip to content

Commit 0130072

Browse files
committed
Serve watch without resourceVersion from cache and introduce a WatchFromStorageWithoutResourceVersion feature gate to allow serving watch from storage.
1 parent 656cb10 commit 0130072

File tree

5 files changed

+60
-17
lines changed

5 files changed

+60
-17
lines changed

pkg/features/kube_features.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
13071307

13081308
genericfeatures.WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
13091309

1310+
genericfeatures.WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
1311+
13101312
genericfeatures.WatchList: {Default: false, PreRelease: featuregate.Alpha},
13111313

13121314
genericfeatures.ZeroLimitedNominalConcurrencyShares: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32

staging/src/k8s.io/apiserver/pkg/features/kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ const (
258258
// Enables support for watch bookmark events.
259259
WatchBookmark featuregate.Feature = "WatchBookmark"
260260

261+
// owner: @serathius
262+
// beta: 1.30
263+
// Enables watches without resourceVersion to be served from storage.
264+
// Used to prevent https://github.com/kubernetes/kubernetes/issues/123072 until etcd fixes the issue.
265+
WatchFromStorageWithoutResourceVersion featuregate.Feature = "WatchFromStorageWithoutResourceVersion"
266+
261267
// owner: @vinaykul
262268
// kep: http://kep.k8s.io/1287
263269
// alpha: v1.27
@@ -349,6 +355,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
349355

350356
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
351357

358+
WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
359+
352360
InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
353361

354362
WatchList: {Default: false, PreRelease: featuregate.Alpha},

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
523523
opts.SendInitialEvents = nil
524524
}
525525
// TODO: we should eventually get rid of this legacy case
526-
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
526+
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
527527
return c.storage.Watch(ctx, key, opts)
528528
}
529529
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
@@ -1282,12 +1282,14 @@ func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(parsedResourceVersion
12821282
//
12831283
// if SendInitiaEvents != nil => ResourceVersionMatch = NotOlderThan
12841284
// if ResourceVersionmatch != nil => ResourceVersionMatch = NotOlderThan & SendInitialEvents != nil
1285-
//
1286-
// to satisfy the legacy case (SendInitialEvents = true, RV="") we skip checking opts.Predicate.AllowWatchBookmarks
12871285
func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (uint64, error) {
12881286
if len(opts.ResourceVersion) != 0 {
12891287
return parsedWatchResourceVersion, nil
12901288
}
1289+
// legacy case
1290+
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
1291+
return 0, nil
1292+
}
12911293
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
12921294
return rv, err
12931295
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,9 +381,18 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
381381
}
382382

383383
func TestWatchSemantics(t *testing.T) {
384-
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
385-
t.Cleanup(terminate)
386-
storagetesting.RunWatchSemantics(context.TODO(), t, store)
384+
t.Run("WatchFromStorageWithoutResourceVersion=true", func(t *testing.T) {
385+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
386+
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
387+
t.Cleanup(terminate)
388+
storagetesting.RunWatchSemantics(context.TODO(), t, store)
389+
})
390+
t.Run("WatchFromStorageWithoutResourceVersion=false", func(t *testing.T) {
391+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
392+
store, terminate := testSetupWithEtcdAndCreateWrapper(t)
393+
t.Cleanup(terminate)
394+
storagetesting.RunWatchSemantics(context.TODO(), t, store)
395+
})
387396
}
388397

389398
func TestWatchSemanticInitialEventsExtended(t *testing.T) {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,6 @@ func TestWatchCacheBypass(t *testing.T) {
338338
t.Fatalf("unexpected error waiting for the cache to be ready")
339339
}
340340

341-
// Inject error to underlying layer and check if cacher is not bypassed.
342-
backingStorage.injectError(errDummy)
343341
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
344342
ResourceVersion: "0",
345343
Predicate: storage.Everything,
@@ -348,12 +346,32 @@ func TestWatchCacheBypass(t *testing.T) {
348346
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
349347
}
350348

351-
// With unset RV, check if cacher is bypassed.
352349
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
353350
ResourceVersion: "",
351+
Predicate: storage.Everything,
354352
})
355-
if err != errDummy {
356-
t.Errorf("Watch with unset RV should bypass cacher: %v", err)
353+
if err != nil {
354+
t.Errorf("Watch with RV=0 should be served from cache: %v", err)
355+
}
356+
357+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, false)()
358+
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
359+
ResourceVersion: "",
360+
Predicate: storage.Everything,
361+
})
362+
if err != nil {
363+
t.Errorf("With WatchFromStorageWithoutResourceVersion disabled, watch with unset RV should be served from cache: %v", err)
364+
}
365+
366+
// Inject error to underlying layer and check if cacher is not bypassed.
367+
backingStorage.injectError(errDummy)
368+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchFromStorageWithoutResourceVersion, true)()
369+
_, err = cacher.Watch(context.TODO(), "pod/ns", storage.ListOptions{
370+
ResourceVersion: "",
371+
Predicate: storage.Everything,
372+
})
373+
if !errors.Is(err, errDummy) {
374+
t.Errorf("With WatchFromStorageWithoutResourceVersion enabled, watch with unset RV should be served from storage: %v", err)
357375
}
358376
}
359377

@@ -2032,9 +2050,11 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
20322050
// | Unset | true/false | nil/true/false |
20332051
// +-----------------+---------------------+-----------------------+
20342052
{
2035-
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
2036-
opts: listOptions(true, nil, ""),
2037-
expectedWatchResourceVersion: 100,
2053+
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=nil",
2054+
opts: listOptions(true, nil, ""),
2055+
// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
2056+
// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
2057+
expectedWatchResourceVersion: 0,
20382058
},
20392059
{
20402060
name: "RV=unset, allowWatchBookmarks=true, sendInitialEvents=true",
@@ -2047,9 +2067,11 @@ func TestGetWatchCacheResourceVersion(t *testing.T) {
20472067
expectedWatchResourceVersion: 100,
20482068
},
20492069
{
2050-
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
2051-
opts: listOptions(false, nil, ""),
2052-
expectedWatchResourceVersion: 100,
2070+
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=nil",
2071+
opts: listOptions(false, nil, ""),
2072+
// Expecting RV 0, due to https://github.com/kubernetes/kubernetes/pull/123935 reverted to serving those requests from watch cache.
2073+
// Set to 100, when WatchFromStorageWithoutResourceVersion is set to true.
2074+
expectedWatchResourceVersion: 0,
20532075
},
20542076
{
20552077
name: "RV=unset, allowWatchBookmarks=false, sendInitialEvents=true, legacy",

0 commit comments

Comments
 (0)