Skip to content

Commit 984b475

Browse files
committed
Extract delegator.Helper interface to allow making delegate decision based on cache state
1 parent 917a556 commit 984b475

File tree

6 files changed

+118
-99
lines changed

6 files changed

+118
-99
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
4949
"k8s.io/apiserver/pkg/features"
5050
"k8s.io/apiserver/pkg/storage"
51+
"k8s.io/apiserver/pkg/storage/cacher/delegator"
5152
"k8s.io/apiserver/pkg/storage/cacher/metrics"
5253
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
5354
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
@@ -334,9 +335,12 @@ func TestShouldDelegateList(t *testing.T) {
334335
expectBypass = bypass
335336
}
336337
}
337-
gotBypass, _ := shouldDelegateList(toStorageOpts(opt))
338-
if gotBypass != expectBypass {
339-
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, gotBypass)
338+
result, err := shouldDelegateList(toStorageOpts(opt), delegator.CacheWithoutSnapshots{})
339+
if err != nil {
340+
t.Fatal(err)
341+
}
342+
if result.ShouldDelegate != expectBypass {
343+
t.Errorf("Unexpected bypass result for List request with options %+v, bypass expected: %v, got: %v", opt, expectBypass, result.ShouldDelegate)
340344
}
341345
}
342346
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ import (
3636
"k8s.io/apiserver/pkg/audit"
3737
"k8s.io/apiserver/pkg/features"
3838
"k8s.io/apiserver/pkg/storage"
39+
"k8s.io/apiserver/pkg/storage/cacher/delegator"
3940
"k8s.io/apiserver/pkg/storage/cacher/metrics"
40-
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
4141
utilfeature "k8s.io/apiserver/pkg/util/feature"
4242
"k8s.io/component-base/tracing"
4343
"k8s.io/klog/v2"
@@ -180,8 +180,11 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
180180
if err != nil {
181181
return err
182182
}
183-
shouldDelegate, consistentRead := shouldDelegateList(opts)
184-
if shouldDelegate {
183+
result, err := shouldDelegateList(opts, delegator.CacheWithoutSnapshots{})
184+
if err != nil {
185+
return err
186+
}
187+
if result.ShouldDelegate {
185188
return c.storage.GetList(ctx, key, opts, listObj)
186189
}
187190

@@ -203,7 +206,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
203206
return c.storage.GetList(ctx, key, opts, listObj)
204207
}
205208
}
206-
if consistentRead {
209+
if result.ConsistentRead {
207210
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
208211
if err != nil {
209212
return err
@@ -215,7 +218,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
215218
success := "true"
216219
fallback := "false"
217220
if err != nil {
218-
if consistentRead {
221+
if result.ConsistentRead {
219222
if storage.IsTooLargeResourceVersion(err) {
220223
fallback = "true"
221224
// Reset resourceVersion during fallback from consistent read.
@@ -229,7 +232,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
229232
}
230233
return err
231234
}
232-
if consistentRead {
235+
if result.ConsistentRead {
233236
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
234237
}
235238
return nil
@@ -243,36 +246,32 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
243246
return noLabelSelector && noFieldSelector && hasLimit
244247
}
245248

246-
// NOTICE: Keep in sync with shouldListFromStorage function in
249+
// NOTICE: Keep in sync with shouldDelegateList function in
247250
//
248251
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
249-
func shouldDelegateList(opts storage.ListOptions) (shouldDeletage, consistentRead bool) {
252+
func shouldDelegateList(opts storage.ListOptions, cache delegator.Helper) (delegator.Result, error) {
250253
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
251-
consistentRead = false
252254
switch opts.ResourceVersionMatch {
253255
case metav1.ResourceVersionMatchExact:
254-
return true, consistentRead
256+
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
255257
case metav1.ResourceVersionMatchNotOlderThan:
256-
return false, consistentRead
258+
return delegator.Result{ShouldDelegate: false}, nil
257259
case "":
258260
// Legacy exact match
259261
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
260-
return true, consistentRead
262+
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
261263
}
262264
// Continue
263265
if len(opts.Predicate.Continue) > 0 {
264-
return true, consistentRead
266+
return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive)
265267
}
266268
// Consistent Read
267269
if opts.ResourceVersion == "" {
268-
consistentRead = true
269-
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
270-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
271-
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
270+
return cache.ShouldDelegateConsistentRead()
272271
}
273-
return false, consistentRead
272+
return delegator.Result{ShouldDelegate: false}, nil
274273
default:
275-
return true, consistentRead
274+
return delegator.Result{ShouldDelegate: true}, nil
276275
}
277276
}
278277

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package delegator
18+
19+
import (
20+
"k8s.io/apiserver/pkg/features"
21+
"k8s.io/apiserver/pkg/storage"
22+
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
23+
utilfeature "k8s.io/apiserver/pkg/util/feature"
24+
)
25+
26+
type Helper interface {
27+
ShouldDelegateExactRV(rv string, recursive bool) (Result, error)
28+
ShouldDelegateContinue(continueToken string, recursive bool) (Result, error)
29+
ShouldDelegateConsistentRead() (Result, error)
30+
}
31+
32+
// Result of delegator decision.
33+
type Result struct {
34+
// Whether a request cannot be served by cache and should be delegated to etcd.
35+
ShouldDelegate bool
36+
// Whether a request is a consistent read, used by delegator to decide if it should call GetCurrentResourceVersion to get RV.
37+
// Included in interface as only cacher has keyPrefix needed to parse continue token.
38+
ConsistentRead bool
39+
}
40+
41+
type CacheWithoutSnapshots struct{}
42+
43+
var _ Helper = CacheWithoutSnapshots{}
44+
45+
func (c CacheWithoutSnapshots) ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) {
46+
return Result{
47+
ShouldDelegate: true,
48+
// Continue with negative RV is considered a consistent read, however token cannot be parsed without keyPrefix unavailable in staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
49+
ConsistentRead: false,
50+
}, nil
51+
}
52+
53+
func (c CacheWithoutSnapshots) ShouldDelegateExactRV(rv string, recursive bool) (Result, error) {
54+
return Result{
55+
ShouldDelegate: true,
56+
ConsistentRead: false,
57+
}, nil
58+
}
59+
60+
func (c CacheWithoutSnapshots) ShouldDelegateConsistentRead() (Result, error) {
61+
return Result{
62+
ShouldDelegate: !ConsistentReadSupported(),
63+
ConsistentRead: true,
64+
}, nil
65+
}
66+
67+
// ConsistentReadSupported returns whether cache can be used to serve reads with RV not yet observed by cache, including both consistent reads.
68+
// Function is located here to avoid import cycles between staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go and staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
69+
func ConsistentReadSupported() bool {
70+
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
71+
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
72+
return consistentListFromCacheEnabled && requestWatchProgressSupported
73+
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator_test.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import (
2020
"context"
2121
"testing"
2222

23-
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
24-
"k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
2523
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2624
"k8s.io/apimachinery/pkg/runtime"
2725
"k8s.io/apiserver/pkg/apis/example"
@@ -194,56 +192,3 @@ func TestCalculateDigest(t *testing.T) {
194192
})
195193
}
196194
}
197-
198-
func TestValidateUndelegatedListOptions(t *testing.T) {
199-
opts := []storage.ListOptions{}
200-
keyPrefix := "/pods/"
201-
continueOnRV1, err := storage.EncodeContinue("/pods/a", keyPrefix, 1)
202-
if err != nil {
203-
t.Fatalf("Unexpected error: %v", err)
204-
}
205-
continueOnNegativeRV, err := storage.EncodeContinue("/pods/a", keyPrefix, -1)
206-
if err != nil {
207-
t.Fatalf("Unexpected error: %v", err)
208-
}
209-
for _, rv := range []string{"", "0", "1"} {
210-
for _, match := range []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan} {
211-
for _, c := range []string{"", continueOnRV1, continueOnNegativeRV} {
212-
for _, limit := range []int64{0, 100} {
213-
for _, recursive := range []bool{true, false} {
214-
opt := storage.ListOptions{
215-
ResourceVersion: rv,
216-
ResourceVersionMatch: match,
217-
Predicate: storage.SelectionPredicate{
218-
Limit: limit,
219-
Continue: c,
220-
},
221-
Recursive: recursive,
222-
}
223-
// Skip requests that will not pass apiserver validation
224-
if errs := validation.ValidateListOptions(&internalversion.ListOptions{
225-
ResourceVersion: opt.ResourceVersion,
226-
ResourceVersionMatch: opt.ResourceVersionMatch,
227-
Limit: opt.Predicate.Limit,
228-
Continue: opt.Predicate.Continue,
229-
}, false); len(errs) != 0 {
230-
continue
231-
}
232-
// Skip requests sent directly to etcd
233-
if delegateList, _ := shouldDelegateList(opt); delegateList {
234-
continue
235-
}
236-
opts = append(opts, opt)
237-
}
238-
239-
}
240-
}
241-
}
242-
}
243-
for _, opt := range opts {
244-
_, _, err := storage.ValidateListOptions(keyPrefix, storage.APIObjectVersioner{}, opt)
245-
if err != nil {
246-
t.Errorf("Expected List requests %+v to pass validation, got: %v", opt, err)
247-
}
248-
}
249-
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import (
3232
"k8s.io/apimachinery/pkg/watch"
3333
"k8s.io/apiserver/pkg/features"
3434
"k8s.io/apiserver/pkg/storage"
35+
"k8s.io/apiserver/pkg/storage/cacher/delegator"
3536
"k8s.io/apiserver/pkg/storage/cacher/metrics"
3637
"k8s.io/apiserver/pkg/storage/cacher/progress"
37-
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
3838
utilfeature "k8s.io/apiserver/pkg/util/feature"
3939
"k8s.io/client-go/tools/cache"
4040
"k8s.io/component-base/tracing"
@@ -496,8 +496,7 @@ func (s sortableStoreElements) Swap(i, j int) {
496496
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
497497
// with their ResourceVersion and the name of the index, if any, that was used.
498498
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
499-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
500-
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
499+
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
501500
w.waitingUntilFresh.Add()
502501
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
503502
w.waitingUntilFresh.Remove()
@@ -562,8 +561,7 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
562561
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
563562
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
564563
var err error
565-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
566-
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
564+
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
567565
w.waitingUntilFresh.Add()
568566
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
569567
w.waitingUntilFresh.Remove()

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ import (
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
"k8s.io/apimachinery/pkg/runtime/schema"
2525
apirequest "k8s.io/apiserver/pkg/endpoints/request"
26-
"k8s.io/apiserver/pkg/features"
27-
"k8s.io/apiserver/pkg/storage"
28-
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
29-
utilfeature "k8s.io/apiserver/pkg/util/feature"
26+
"k8s.io/apiserver/pkg/storage/cacher/delegator"
3027
"k8s.io/klog/v2"
3128
)
3229

@@ -85,7 +82,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
8582
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
8683
}
8784
}
88-
listFromStorage, _ := shouldListFromStorage(&listOptions)
85+
// TODO: Check whether watchcache is enabled.
86+
result, err := shouldDelegateList(&listOptions, delegator.CacheWithoutSnapshots{})
87+
if err != nil {
88+
return WorkEstimate{InitialSeats: maxSeats}
89+
}
90+
listFromStorage := result.ShouldDelegate
8991
isListFromCache := requestInfo.Verb == "watch" || !listFromStorage
9092

9193
numStored, err := e.countGetterFn(key(requestInfo))
@@ -162,32 +164,30 @@ func key(requestInfo *apirequest.RequestInfo) string {
162164
// NOTICE: Keep in sync with shouldDelegateList function in
163165
//
164166
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
165-
func shouldListFromStorage(opts *metav1.ListOptions) (shouldDeletage, consistentRead bool) {
167+
func shouldDelegateList(opts *metav1.ListOptions, cache delegator.Helper) (delegator.Result, error) {
166168
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
167-
consistentRead = false
168169
switch opts.ResourceVersionMatch {
169170
case metav1.ResourceVersionMatchExact:
170-
return true, consistentRead
171+
return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive)
171172
case metav1.ResourceVersionMatchNotOlderThan:
172-
return false, consistentRead
173+
return delegator.Result{ShouldDelegate: false}, nil
173174
case "":
174175
// Legacy exact match
175176
if opts.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
176-
return true, consistentRead
177+
return cache.ShouldDelegateExactRV(opts.ResourceVersion, defaultRecursive)
177178
}
178179
// Continue
179180
if len(opts.Continue) > 0 {
180-
return true, consistentRead
181+
return cache.ShouldDelegateContinue(opts.Continue, defaultRecursive)
181182
}
182183
// Consistent Read
183184
if opts.ResourceVersion == "" {
184-
consistentRead = true
185-
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
186-
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
187-
return !consistentListFromCacheEnabled || !requestWatchProgressSupported, consistentRead
185+
return cache.ShouldDelegateConsistentRead()
188186
}
189-
return false, consistentRead
187+
return delegator.Result{ShouldDelegate: false}, nil
190188
default:
191-
return true, consistentRead
189+
return delegator.Result{ShouldDelegate: true}, nil
192190
}
193191
}
192+
193+
var defaultRecursive = true

0 commit comments

Comments
 (0)