Skip to content

Commit d32e9b0

Browse files
authored
Merge pull request kubernetes#127982 from tkashem/refactor-store-decoder
KEP-3926: refactor: extract etcd3 store decode functions into an interface
2 parents 7c53005 + 1d1a656 commit d32e9b0

File tree

6 files changed

+122
-52
lines changed

6 files changed

+122
-52
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,20 @@ func newPodList() runtime.Object { return &example.PodList{} }
5757

5858
func newEtcdTestStorage(t testing.TB, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
5959
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
60+
versioner := storage.APIObjectVersioner{}
61+
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
6062
storage := etcd3.New(
6163
server.V3Client,
62-
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
64+
codec,
6365
newPod,
6466
newPodList,
6567
prefix,
6668
"/pods",
6769
schema.GroupResource{Resource: "pods"},
6870
identity.NewEncryptCheckTransformer(),
69-
etcd3.NewDefaultLeaseManagerConfig())
71+
etcd3.NewDefaultLeaseManagerConfig(),
72+
etcd3.NewDefaultDecoder(codec, versioner),
73+
versioner)
7074
return server, storage
7175
}
7276

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package etcd3
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"k8s.io/apimachinery/pkg/conversion"
25+
"k8s.io/apimachinery/pkg/runtime"
26+
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
27+
"k8s.io/apiserver/pkg/storage"
28+
29+
"k8s.io/klog/v2"
30+
)
31+
32+
// NewDefaultDecoder returns the default decoder for etcd3 store
33+
func NewDefaultDecoder(codec runtime.Codec, versioner storage.Versioner) Decoder {
34+
return &defaultDecoder{
35+
codec: codec,
36+
versioner: versioner,
37+
}
38+
}
39+
40+
// Decoder is used by the etcd storage implementation to decode
41+
// transformed data from the storage into an object
42+
type Decoder interface {
43+
// Decode decodes value of bytes into object. It will also
44+
// set the object resource version to rev.
45+
// On success, objPtr would be set to the object.
46+
Decode(value []byte, objPtr runtime.Object, rev int64) error
47+
48+
// DecodeListItem decodes bytes value in array into object.
49+
DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error)
50+
}
51+
52+
var _ Decoder = &defaultDecoder{}
53+
54+
type defaultDecoder struct {
55+
codec runtime.Codec
56+
versioner storage.Versioner
57+
}
58+
59+
// decode decodes value of bytes into object. It will also set the object resource version to rev.
60+
// On success, objPtr would be set to the object.
61+
func (d *defaultDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error {
62+
if _, err := conversion.EnforcePtr(objPtr); err != nil {
63+
// nolint:errorlint // this code was moved from store.go as is
64+
return fmt.Errorf("unable to convert output object to pointer: %v", err)
65+
}
66+
_, _, err := d.codec.Decode(value, nil, objPtr)
67+
if err != nil {
68+
return err
69+
}
70+
// being unable to set the version does not prevent the object from being extracted
71+
if err := d.versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
72+
klog.Errorf("failed to update object version: %v", err)
73+
}
74+
return nil
75+
}
76+
77+
// decodeListItem decodes bytes value in array into object.
78+
func (d *defaultDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) {
79+
startedAt := time.Now()
80+
defer func() {
81+
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
82+
}()
83+
84+
obj, _, err := d.codec.Decode(data, nil, newItemFunc())
85+
if err != nil {
86+
return nil, err
87+
}
88+
89+
if err := d.versioner.UpdateObject(obj, rev); err != nil {
90+
klog.Errorf("failed to update object version: %v", err)
91+
}
92+
93+
return obj, nil
94+
}

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 12 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"k8s.io/apimachinery/pkg/runtime/schema"
3939
"k8s.io/apimachinery/pkg/watch"
4040
"k8s.io/apiserver/pkg/audit"
41-
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
4241
"k8s.io/apiserver/pkg/features"
4342
"k8s.io/apiserver/pkg/storage"
4443
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
@@ -82,6 +81,7 @@ type store struct {
8281
groupResourceString string
8382
watcher *watcher
8483
leaseManager *leaseManager
84+
decoder Decoder
8585
}
8686

8787
func (s *store) RequestWatchProgress(ctx context.Context) error {
@@ -99,12 +99,11 @@ type objState struct {
9999
}
100100

101101
// New returns an etcd3 implementation of storage.Interface.
102-
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
103-
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
102+
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
103+
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
104104
}
105105

106-
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
107-
versioner := storage.APIObjectVersioner{}
106+
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
108107
// for compatibility with etcd2 impl.
109108
// no-op for default prefix of '/registry'.
110109
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@@ -137,6 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
137136
groupResourceString: groupResource.String(),
138137
watcher: w,
139138
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
139+
decoder: decoder,
140140
}
141141

142142
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
@@ -182,7 +182,7 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
182182
return storage.NewInternalError(err)
183183
}
184184

185-
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
185+
err = s.decoder.Decode(data, out, kv.ModRevision)
186186
if err != nil {
187187
recordDecodeError(s.groupResourceString, preparedKey)
188188
return err
@@ -248,7 +248,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
248248

249249
if out != nil {
250250
putResp := txnResp.Responses[0].GetResponsePut()
251-
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
251+
err = s.decoder.Decode(data, out, putResp.Header.Revision)
252252
if err != nil {
253253
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
254254
recordDecodeError(s.groupResourceString, preparedKey)
@@ -379,7 +379,7 @@ func (s *store) conditionalDelete(
379379
return errors.New("invalid DeleteRange response - nil header")
380380
}
381381
if !skipTransformDecode {
382-
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
382+
err = s.decoder.Decode(origState.data, out, deleteResp.Header.Revision)
383383
if err != nil {
384384
recordDecodeError(s.groupResourceString, key)
385385
return err
@@ -496,7 +496,7 @@ func (s *store) GuaranteedUpdate(
496496
}
497497
// recheck that the data from etcd is not stale before short-circuiting a write
498498
if !origState.stale {
499-
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
499+
err = s.decoder.Decode(origState.data, destination, origState.rev)
500500
if err != nil {
501501
recordDecodeError(s.groupResourceString, preparedKey)
502502
return err
@@ -547,7 +547,7 @@ func (s *store) GuaranteedUpdate(
547547
}
548548
putResp := txnResp.Responses[0].GetResponsePut()
549549

550-
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
550+
err = s.decoder.Decode(data, destination, putResp.Header.Revision)
551551
if err != nil {
552552
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
553553
recordDecodeError(s.groupResourceString, preparedKey)
@@ -779,7 +779,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
779779
default:
780780
}
781781

782-
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc)
782+
obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc)
783783
if err != nil {
784784
recordDecodeError(s.groupResourceString, string(kv.Key))
785785
return err
@@ -939,7 +939,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
939939
state.data = data
940940
state.stale = stale
941941

942-
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
942+
if err := s.decoder.Decode(state.data, state.obj, state.rev); err != nil {
943943
recordDecodeError(s.groupResourceString, key)
944944
return nil, err
945945
}
@@ -1046,42 +1046,6 @@ func (s *store) prepareKey(key string) (string, error) {
10461046
return s.pathPrefix + key[startIndex:], nil
10471047
}
10481048

1049-
// decode decodes value of bytes into object. It will also set the object resource version to rev.
1050-
// On success, objPtr would be set to the object.
1051-
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
1052-
if _, err := conversion.EnforcePtr(objPtr); err != nil {
1053-
return fmt.Errorf("unable to convert output object to pointer: %v", err)
1054-
}
1055-
_, _, err := codec.Decode(value, nil, objPtr)
1056-
if err != nil {
1057-
return err
1058-
}
1059-
// being unable to set the version does not prevent the object from being extracted
1060-
if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
1061-
klog.Errorf("failed to update object version: %v", err)
1062-
}
1063-
return nil
1064-
}
1065-
1066-
// decodeListItem decodes bytes value in array into object.
1067-
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
1068-
startedAt := time.Now()
1069-
defer func() {
1070-
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
1071-
}()
1072-
1073-
obj, _, err := codec.Decode(data, nil, newItemFunc())
1074-
if err != nil {
1075-
return nil, err
1076-
}
1077-
1078-
if err := versioner.UpdateObject(obj, rev); err != nil {
1079-
klog.Errorf("failed to update object version: %v", err)
1080-
}
1081-
1082-
return obj, nil
1083-
}
1084-
10851049
// recordDecodeError record decode error split by object type.
10861050
func recordDecodeError(resource string, key string) {
10871051
metrics.RecordDecodeError(resource)

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
566566
if setupOpts.recorderEnabled {
567567
client.KV = &clientRecorder{KV: client.KV}
568568
}
569+
versioner := storage.APIObjectVersioner{}
569570
store := newStore(
570571
client,
571572
setupOpts.codec,
@@ -576,6 +577,8 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli
576577
setupOpts.groupResource,
577578
setupOpts.transformer,
578579
setupOpts.leaseConfig,
580+
NewDefaultDecoder(setupOpts.codec, versioner),
581+
versioner,
579582
)
580583
ctx := context.Background()
581584
return ctx, store, client

staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,10 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
459459
if transformer == nil {
460460
transformer = identity.NewEncryptCheckTransformer()
461461
}
462-
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil
462+
463+
versioner := storage.APIObjectVersioner{}
464+
decoder := etcd3.NewDefaultDecoder(c.Codec, versioner)
465+
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner), destroyFunc, nil
463466
}
464467

465468
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

staging/src/k8s.io/apiserver/pkg/storage/util_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ func TestHighWaterMark(t *testing.T) {
8484
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
8585
// test data
8686
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
87+
versioner := storage.APIObjectVersioner{}
88+
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion)
8789
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
88-
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig())
90+
storage := etcd3.New(server.V3Client, codec, func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig(), etcd3.NewDefaultDecoder(codec, versioner), versioner)
8991
return server, storage
9092
}
9193
server, etcdStorage := newEtcdTestStorage(t, "")

0 commit comments

Comments
 (0)