Skip to content

Commit 3596256

Browse files
committed
Implement fallback for consistent reads from cache
1 parent 0fc1671 commit 3596256

File tree

3 files changed

+163
-1
lines changed

3 files changed

+163
-1
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
848848
preparedKey += "/"
849849
}
850850
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
851-
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
851+
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
852+
if consistentRead {
852853
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
853854
if err != nil {
854855
return err
@@ -887,9 +888,24 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
887888
}
888889

889890
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
891+
success := "true"
892+
fallback := "false"
890893
if err != nil {
894+
if consistentRead {
895+
if storage.IsTooLargeResourceVersion(err) {
896+
fallback = "true"
897+
err = c.storage.GetList(ctx, key, opts, listObj)
898+
}
899+
if err != nil {
900+
success = "false"
901+
}
902+
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
903+
}
891904
return err
892905
}
906+
if consistentRead {
907+
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
908+
}
893909
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
894910
// store pointer of eligible objects,
895911
// Why not directly put object in the items of listObj?

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"reflect"
2525
goruntime "runtime"
2626
"strconv"
27+
"strings"
2728
"sync"
2829
"testing"
2930
"time"
@@ -45,10 +46,13 @@ import (
4546
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
4647
"k8s.io/apiserver/pkg/features"
4748
"k8s.io/apiserver/pkg/storage"
49+
"k8s.io/apiserver/pkg/storage/cacher/metrics"
4850
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
4951
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
5052
utilfeature "k8s.io/apiserver/pkg/util/feature"
5153
featuregatetesting "k8s.io/component-base/featuregate/testing"
54+
k8smetrics "k8s.io/component-base/metrics"
55+
"k8s.io/component-base/metrics/testutil"
5256
"k8s.io/utils/clock"
5357
testingclock "k8s.io/utils/clock/testing"
5458
"k8s.io/utils/pointer"
@@ -288,6 +292,138 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
288292
}
289293
}
290294

295+
func TestConsistentReadFallback(t *testing.T) {
296+
tcs := []struct {
297+
name string
298+
consistentReadsEnabled bool
299+
watchCacheRV string
300+
storageRV string
301+
fallbackError bool
302+
303+
expectError bool
304+
expectRV string
305+
expectBlock bool
306+
expectRequestsToStorage int
307+
expectMetric string
308+
}{
309+
{
310+
name: "Success",
311+
consistentReadsEnabled: true,
312+
watchCacheRV: "42",
313+
storageRV: "42",
314+
expectRV: "42",
315+
expectRequestsToStorage: 1,
316+
expectMetric: `
317+
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
318+
# TYPE apiserver_watch_cache_consistent_read_total counter
319+
apiserver_watch_cache_consistent_read_total{fallback="false", resource="pods", success="true"} 1
320+
`,
321+
},
322+
{
323+
name: "Fallback",
324+
consistentReadsEnabled: true,
325+
watchCacheRV: "2",
326+
storageRV: "42",
327+
expectRV: "42",
328+
expectBlock: true,
329+
expectRequestsToStorage: 2,
330+
expectMetric: `
331+
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
332+
# TYPE apiserver_watch_cache_consistent_read_total counter
333+
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="true"} 1
334+
`,
335+
},
336+
{
337+
name: "Fallback Failure",
338+
consistentReadsEnabled: true,
339+
watchCacheRV: "2",
340+
storageRV: "42",
341+
fallbackError: true,
342+
expectError: true,
343+
expectBlock: true,
344+
expectRequestsToStorage: 2,
345+
expectMetric: `
346+
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
347+
# TYPE apiserver_watch_cache_consistent_read_total counter
348+
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="false"} 1
349+
`,
350+
},
351+
{
352+
name: "Disabled",
353+
watchCacheRV: "2",
354+
storageRV: "42",
355+
expectRV: "42",
356+
expectRequestsToStorage: 1,
357+
expectMetric: ``,
358+
},
359+
}
360+
for _, tc := range tcs {
361+
t.Run(tc.name, func(t *testing.T) {
362+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.consistentReadsEnabled)
363+
if tc.consistentReadsEnabled {
364+
forceRequestWatchProgressSupport(t)
365+
}
366+
367+
registry := k8smetrics.NewKubeRegistry()
368+
metrics.ConsistentReadTotal.Reset()
369+
if err := registry.Register(metrics.ConsistentReadTotal); err != nil {
370+
t.Errorf("unexpected error: %v", err)
371+
}
372+
backingStorage := &dummyStorage{}
373+
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
374+
podList := listObj.(*example.PodList)
375+
podList.ResourceVersion = tc.watchCacheRV
376+
return nil
377+
}
378+
// TODO: Use fake clock for this test to reduce execution time.
379+
cacher, _, err := newTestCacher(backingStorage)
380+
if err != nil {
381+
t.Fatalf("Couldn't create cacher: %v", err)
382+
}
383+
defer cacher.Stop()
384+
385+
if fmt.Sprintf("%d", cacher.watchCache.resourceVersion) != tc.watchCacheRV {
386+
t.Fatalf("Expected watch cache RV to equal watchCacheRV, got: %d, want: %s", cacher.watchCache.resourceVersion, tc.watchCacheRV)
387+
}
388+
requestToStorageCount := 0
389+
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
390+
requestToStorageCount += 1
391+
podList := listObj.(*example.PodList)
392+
if key == cacher.resourcePrefix {
393+
podList.ResourceVersion = tc.storageRV
394+
return nil
395+
}
396+
if tc.fallbackError {
397+
return errDummy
398+
}
399+
podList.ResourceVersion = tc.storageRV
400+
return nil
401+
}
402+
result := &example.PodList{}
403+
start := cacher.clock.Now()
404+
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
405+
duration := cacher.clock.Since(start)
406+
if (err != nil) != tc.expectError {
407+
t.Fatalf("Unexpected error err: %v", err)
408+
}
409+
if result.ResourceVersion != tc.expectRV {
410+
t.Fatalf("Unexpected List response RV, got: %q, want: %q", result.ResourceVersion, tc.expectRV)
411+
}
412+
if requestToStorageCount != tc.expectRequestsToStorage {
413+
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", requestToStorageCount, tc.expectRequestsToStorage)
414+
}
415+
blocked := duration >= blockTimeout
416+
if blocked != tc.expectBlock {
417+
t.Fatalf("Unexpected block, got: %v, want: %v", blocked, tc.expectBlock)
418+
}
419+
420+
if err := testutil.GatherAndCompare(registry, strings.NewReader(tc.expectMetric), "apiserver_watch_cache_consistent_read_total"); err != nil {
421+
t.Errorf("unexpected error: %v", err)
422+
}
423+
})
424+
}
425+
}
426+
291427
func TestGetListNonRecursiveCacheBypass(t *testing.T) {
292428
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
293429
backingStorage := &dummyStorage{}

staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ var (
167167
StabilityLevel: compbasemetrics.ALPHA,
168168
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3},
169169
}, []string{"resource"})
170+
171+
ConsistentReadTotal = compbasemetrics.NewCounterVec(
172+
&compbasemetrics.CounterOpts{
173+
Namespace: namespace,
174+
Subsystem: subsystem,
175+
Name: "consistent_read_total",
176+
Help: "Counter for consistent reads from cache.",
177+
StabilityLevel: compbasemetrics.ALPHA,
178+
}, []string{"resource", "success", "fallback"})
170179
)
171180

172181
var registerMetrics sync.Once
@@ -188,6 +197,7 @@ func Register() {
188197
legacyregistry.MustRegister(WatchCacheCapacity)
189198
legacyregistry.MustRegister(WatchCacheInitializations)
190199
legacyregistry.MustRegister(WatchCacheReadWait)
200+
legacyregistry.MustRegister(ConsistentReadTotal)
191201
})
192202
}
193203

0 commit comments

Comments
 (0)