Skip to content

Commit 2e073d6

Browse files
authored
Merge pull request kubernetes#130400 from serathius/storage-current-rv
Move GetCurrentResourceVersion to storage.Interface
2 parents 3d9fcb7 + fea89f2 commit 2e073d6

File tree

9 files changed

+133
-123
lines changed

9 files changed

+133
-123
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1248,7 +1248,7 @@ func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchRe
12481248
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
12491249
return 0, nil
12501250
}
1251-
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
1251+
rv, err := c.storage.GetCurrentResourceVersion(ctx)
12521252
return rv, err
12531253
}
12541254

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ type dummyStorage struct {
113113
sync.RWMutex
114114
err error
115115
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
116+
getRVFn func(_ context.Context) (uint64, error)
116117
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
117118

118119
// use getRequestWatchProgressCounter when reading
@@ -199,6 +200,13 @@ func (d *dummyStorage) injectError(err error) {
199200
d.err = err
200201
}
201202

203+
func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
204+
if d.getRVFn != nil {
205+
return d.getRVFn(ctx)
206+
}
207+
return 100, nil
208+
}
209+
202210
func TestGetListCacheBypass(t *testing.T) {
203211
type opts struct {
204212
ResourceVersion string
@@ -463,6 +471,14 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
463471
podList.ResourceVersion = tc.storageRV
464472
return nil
465473
}
474+
backingStorage.getRVFn = func(_ context.Context) (uint64, error) {
475+
requestToStorageCount += 1
476+
rv, err := strconv.Atoi(tc.storageRV)
477+
if err != nil {
478+
t.Fatalf("failed to parse RV: %s", err)
479+
}
480+
return uint64(rv), nil
481+
}
466482
result := &example.PodList{}
467483

468484
ctx, clockStepCancelFn := context.WithTimeout(context.TODO(), time.Minute)
@@ -2111,6 +2127,15 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
21112127
listAccessor.SetResourceVersion("105")
21122128
return nil
21132129
}
2130+
s.getRVFn = func(_ context.Context) (uint64, error) {
2131+
// the first call to this function
2132+
// primes the cacher
2133+
if !hasBeenPrimed {
2134+
hasBeenPrimed = true
2135+
return 100, nil
2136+
}
2137+
return 105, nil
2138+
}
21142139
return s
21152140
}(),
21162141
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
@@ -2146,6 +2171,15 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
21462171
listAccessor.SetResourceVersion("105")
21472172
return nil
21482173
}
2174+
s.getRVFn = func(_ context.Context) (uint64, error) {
2175+
// the first call to this function
2176+
// primes the cacher
2177+
if !hasBeenPrimed {
2178+
hasBeenPrimed = true
2179+
return 100, nil
2180+
}
2181+
return 105, nil
2182+
}
21492183
return s
21502184
}(),
21512185
verifyBackingStore: func(t *testing.T, s *dummyStorage) {

staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func (c *CacheDelegator) Create(ctx context.Context, key string, obj, out runtim
5757
return c.storage.Create(ctx, key, obj, out, ttl)
5858
}
5959

60+
func (c *CacheDelegator) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
61+
return c.storage.GetCurrentResourceVersion(ctx)
62+
}
63+
6064
func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
6165
// Ignore the suggestion and try to pass down the current version of the object
6266
// read from cache.
@@ -160,7 +164,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
160164
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
161165
consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
162166
if consistentRead {
163-
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.cacher.newListFunc, c.cacher.resourcePrefix, c.cacher.objectType.String())
167+
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
164168
if err != nil {
165169
return err
166170
}

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"path"
2424
"reflect"
25+
"strconv"
2526
"strings"
2627
"time"
2728

@@ -35,6 +36,8 @@ import (
3536
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3637
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3738
"k8s.io/apimachinery/pkg/conversion"
39+
"k8s.io/apimachinery/pkg/fields"
40+
"k8s.io/apimachinery/pkg/labels"
3841
"k8s.io/apimachinery/pkg/runtime"
3942
"k8s.io/apimachinery/pkg/runtime/schema"
4043
"k8s.io/apimachinery/pkg/watch"
@@ -84,6 +87,9 @@ type store struct {
8487
leaseManager *leaseManager
8588
decoder Decoder
8689
listErrAggrFactory func() ListErrorAggregator
90+
91+
resourcePrefix string
92+
newListFunc func() runtime.Object
8793
}
8894

8995
func (s *store) RequestWatchProgress(ctx context.Context) error {
@@ -185,10 +191,13 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu
185191
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
186192
decoder: decoder,
187193
listErrAggrFactory: listErrAggrFactory,
194+
195+
resourcePrefix: resourcePrefix,
196+
newListFunc: newListFunc,
188197
}
189198

190199
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
191-
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
200+
return s.GetCurrentResourceVersion(ctx)
192201
}
193202
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
194203
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
@@ -677,6 +686,37 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto
677686
return withRev, nil
678687
}
679688

689+
func (s *store) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
690+
emptyList := s.newListFunc()
691+
pred := storage.SelectionPredicate{
692+
Label: labels.Everything(),
693+
Field: fields.Everything(),
694+
Limit: 1, // just in case we actually hit something
695+
}
696+
697+
err := s.GetList(ctx, s.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
698+
if err != nil {
699+
return 0, err
700+
}
701+
emptyListAccessor, err := meta.ListAccessor(emptyList)
702+
if err != nil {
703+
return 0, err
704+
}
705+
if emptyListAccessor == nil {
706+
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
707+
}
708+
709+
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
710+
if err != nil {
711+
return 0, err
712+
}
713+
714+
if currentResourceVersion == 0 {
715+
return 0, fmt.Errorf("the current resource version must be greater than 0")
716+
}
717+
return uint64(currentResourceVersion), nil
718+
}
719+
680720
// GetList implements storage.Interface.
681721
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
682722
keyPrefix, err := s.prepareKey(key)

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"testing"
2929

3030
"github.com/go-logr/logr"
31+
"github.com/stretchr/testify/require"
3132
clientv3 "go.etcd.io/etcd/client/v3"
3233
"go.etcd.io/etcd/client/v3/kubernetes"
3334
"go.etcd.io/etcd/server/v3/embed"
@@ -994,3 +995,49 @@ func BenchmarkStoreList(b *testing.B) {
994995
func computePodKey(obj *example.Pod) string {
995996
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
996997
}
998+
999+
func TestGetCurrentResourceVersion(t *testing.T) {
1000+
ctx, store, _ := testSetup(t)
1001+
1002+
makePod := func(name string) *example.Pod {
1003+
return &example.Pod{
1004+
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
1005+
}
1006+
}
1007+
createPod := func(obj *example.Pod) *example.Pod {
1008+
key := "pods/" + obj.Namespace + "/" + obj.Name
1009+
out := &example.Pod{}
1010+
err := store.Create(context.TODO(), key, obj, out, 0)
1011+
require.NoError(t, err)
1012+
return out
1013+
}
1014+
getPod := func(name, ns string) *example.Pod {
1015+
key := "pods/" + ns + "/" + name
1016+
out := &example.Pod{}
1017+
err := store.Get(context.TODO(), key, storage.GetOptions{}, out)
1018+
require.NoError(t, err)
1019+
return out
1020+
}
1021+
1022+
// create a pod and make sure its RV is equal to the one maintained by etcd
1023+
pod := createPod(makePod("pod-1"))
1024+
currentStorageRV, err := store.GetCurrentResourceVersion(context.TODO())
1025+
require.NoError(t, err)
1026+
podRV, err := store.versioner.ParseResourceVersion(pod.ResourceVersion)
1027+
require.NoError(t, err)
1028+
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
1029+
1030+
// now make unrelated write and make sure the target function returns global etcd RV
1031+
resp, err := store.client.KV.Put(ctx, "compact_rev_key", pod.ResourceVersion)
1032+
require.NoError(t, err)
1033+
currentStorageRV, err = store.GetCurrentResourceVersion(context.TODO())
1034+
require.NoError(t, err)
1035+
require.NoError(t, err)
1036+
require.Equal(t, currentStorageRV, uint64(resp.Header.Revision), "expected the global etcd RV to be equal to replicaset's RV")
1037+
1038+
// ensure that the pod's RV hasn't been changed
1039+
currentPod := getPod(pod.Name, pod.Namespace)
1040+
currentPodRV, err := store.versioner.ParseResourceVersion(currentPod.ResourceVersion)
1041+
require.NoError(t, err)
1042+
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
1043+
}

staging/src/k8s.io/apiserver/pkg/storage/interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ type Interface interface {
262262
// TODO: Remove when storage.Interface will be separate from etc3.store.
263263
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
264264
RequestWatchProgress(ctx context.Context) error
265+
266+
// GetCurrentResourceVersion gets the current resource version from etcd.
267+
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
268+
GetCurrentResourceVersion(ctx context.Context) (uint64, error)
265269
}
266270

267271
// GetOptions provides the options that may be provided for storage get operations.

staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
15321532
}
15331533

15341534
if scenario.useCurrentRV {
1535-
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(ctx, store, func() runtime.Object { return &example.PodList{} }, "/pods", "")
1535+
currentStorageRV, err := store.GetCurrentResourceVersion(ctx)
15361536
require.NoError(t, err)
15371537
scenario.resourceVersion = fmt.Sprintf("%d", currentStorageRV)
15381538
}

staging/src/k8s.io/apiserver/pkg/storage/util.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@ limitations under the License.
1717
package storage
1818

1919
import (
20-
"context"
2120
"fmt"
22-
"strconv"
2321
"sync/atomic"
2422

2523
"k8s.io/apimachinery/pkg/api/meta"
2624
"k8s.io/apimachinery/pkg/api/validation/path"
2725
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28-
"k8s.io/apimachinery/pkg/fields"
29-
"k8s.io/apimachinery/pkg/labels"
3026
"k8s.io/apimachinery/pkg/runtime"
3127
)
3228

@@ -81,45 +77,6 @@ func (hwm *HighWaterMark) Update(current int64) bool {
8177
}
8278
}
8379

84-
// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
85-
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
86-
func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
87-
if storage == nil {
88-
return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
89-
}
90-
if newListFunc == nil {
91-
return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
92-
}
93-
emptyList := newListFunc()
94-
pred := SelectionPredicate{
95-
Label: labels.Everything(),
96-
Field: fields.Everything(),
97-
Limit: 1, // just in case we actually hit something
98-
}
99-
100-
err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
101-
if err != nil {
102-
return 0, err
103-
}
104-
emptyListAccessor, err := meta.ListAccessor(emptyList)
105-
if err != nil {
106-
return 0, err
107-
}
108-
if emptyListAccessor == nil {
109-
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
110-
}
111-
112-
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
113-
if err != nil {
114-
return 0, err
115-
}
116-
117-
if currentResourceVersion == 0 {
118-
return 0, fmt.Errorf("the current resource version must be greater than 0")
119-
}
120-
return uint64(currentResourceVersion), nil
121-
}
122-
12380
// AnnotateInitialEventsEndBookmark adds a special annotation to the given object
12481
// which indicates that the initial events have been sent.
12582
//

0 commit comments

Comments
 (0)