Skip to content

Commit 066c1c0

Browse files
committed
Update recorders to wrap kubernetes.Client
1 parent 249ad2a commit 066c1c0

File tree

9 files changed

+52
-47
lines changed

9 files changed

+52
-47
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,42 +180,42 @@ func TestList(t *testing.T) {
180180
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
181181
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
182182
t.Cleanup(terminate)
183-
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
183+
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
184184
}
185185

186186
func TestListWithConsistentListFromCache(t *testing.T) {
187187
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
188188
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
189189
t.Cleanup(terminate)
190-
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
190+
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
191191
}
192192

193193
func TestConsistentList(t *testing.T) {
194194
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
195195
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
196196
t.Cleanup(terminate)
197-
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false)
197+
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, false)
198198
}
199199

200200
func TestConsistentListWithConsistentListFromCache(t *testing.T) {
201201
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
202202
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
203203
t.Cleanup(terminate)
204-
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true)
204+
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, true)
205205
}
206206

207207
func TestGetListNonRecursive(t *testing.T) {
208208
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
209209
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
210210
t.Cleanup(terminate)
211-
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
211+
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
212212
}
213213

214214
func TestGetListNonRecursiveWithConsistentListFromCache(t *testing.T) {
215215
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
216216
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
217217
t.Cleanup(terminate)
218-
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client), cacher)
218+
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
219219
}
220220

221221
func TestGetListRecursivePrefix(t *testing.T) {
@@ -301,7 +301,7 @@ func TestWatch(t *testing.T) {
301301
func TestWatchFromZero(t *testing.T) {
302302
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
303303
t.Cleanup(terminate)
304-
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client))
304+
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client))
305305
}
306306

307307
func TestDeleteTriggerWatch(t *testing.T) {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
)
2828

2929
func TestCompact(t *testing.T) {
30-
client := testserver.RunEtcd(t, nil)
30+
client := testserver.RunEtcd(t, nil).Client
3131
ctx := context.Background()
3232

3333
putResp, err := client.Put(ctx, "/somekey", "data")
@@ -56,7 +56,7 @@ func TestCompact(t *testing.T) {
5656
// - C1 compacts first. It will succeed.
5757
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
5858
func TestCompactConflict(t *testing.T) {
59-
client := testserver.RunEtcd(t, nil)
59+
client := testserver.RunEtcd(t, nil).Client
6060
ctx := context.Background()
6161

6262
putResp, err := client.Put(ctx, "/somekey", "data")

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"time"
2828

2929
clientv3 "go.etcd.io/etcd/client/v3"
30-
_ "go.etcd.io/etcd/client/v3/kubernetes"
30+
"go.etcd.io/etcd/client/v3/kubernetes"
3131
"go.opentelemetry.io/otel/attribute"
3232

3333
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -73,7 +73,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
7373
var _ value.Context = authenticatedDataString("")
7474

7575
type store struct {
76-
client *clientv3.Client
76+
client *kubernetes.Client
7777
codec runtime.Codec
7878
versioner storage.Versioner
7979
transformer value.Transformer
@@ -100,11 +100,11 @@ type objState struct {
100100
}
101101

102102
// New returns an etcd3 implementation of storage.Interface.
103-
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
103+
func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
104104
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
105105
}
106106

107-
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
107+
func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
108108
// for compatibility with etcd2 impl.
109109
// no-op for default prefix of '/registry'.
110110
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@@ -115,7 +115,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
115115
}
116116

117117
w := &watcher{
118-
client: c,
118+
client: c.Client,
119119
codec: codec,
120120
newFunc: newFunc,
121121
groupResource: groupResource,
@@ -136,7 +136,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
136136
groupResource: groupResource,
137137
groupResourceString: groupResource.String(),
138138
watcher: w,
139-
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
139+
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
140140
decoder: decoder,
141141
}
142142

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/go-logr/logr"
3131
clientv3 "go.etcd.io/etcd/client/v3"
32+
"go.etcd.io/etcd/client/v3/kubernetes"
3233
"go.etcd.io/etcd/server/v3/embed"
3334
"google.golang.org/grpc/grpclog"
3435

@@ -95,7 +96,7 @@ func checkStorageInvariants(etcdClient *clientv3.Client, codec runtime.Codec) st
9596

9697
func TestCreate(t *testing.T) {
9798
ctx, store, etcdClient := testSetup(t)
98-
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient, store.codec))
99+
storagetesting.RunTestCreate(ctx, t, store, checkStorageInvariants(etcdClient.Client, store.codec))
99100
}
100101

101102
func TestCreateWithTTL(t *testing.T) {
@@ -170,7 +171,7 @@ func TestListPaging(t *testing.T) {
170171

171172
func TestGetListNonRecursive(t *testing.T) {
172173
ctx, store, client := testSetup(t)
173-
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client), store)
174+
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store)
174175
}
175176

176177
func TestGetListRecursivePrefix(t *testing.T) {
@@ -194,8 +195,8 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
194195
}
195196

196197
func TestGuaranteedUpdate(t *testing.T) {
197-
ctx, store, etcdClient := testSetup(t)
198-
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(etcdClient, store.codec))
198+
ctx, store, client := testSetup(t)
199+
storagetesting.RunTestGuaranteedUpdate(ctx, t, &storeWithPrefixTransformer{store}, checkStorageInvariants(client.Client, store.codec))
199200
}
200201

201202
func TestGuaranteedUpdateWithTTL(t *testing.T) {
@@ -225,12 +226,12 @@ func TestTransformationFailure(t *testing.T) {
225226

226227
func TestList(t *testing.T) {
227228
ctx, store, client := testSetup(t)
228-
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
229+
storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false)
229230
}
230231

231232
func TestConsistentList(t *testing.T) {
232233
ctx, store, client := testSetup(t)
233-
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true)
234+
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true)
234235
}
235236

236237
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
@@ -258,29 +259,29 @@ func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer,
258259
}
259260
}
260261
if reads := recorder.GetReadsAndReset(); reads != estimatedGetCalls {
261-
t.Errorf("unexpected reads: %d", reads)
262+
t.Fatalf("unexpected reads: %d, want: %d", reads, estimatedGetCalls)
262263
}
263264
}
264265
}
265266

266267
func TestListContinuation(t *testing.T) {
267-
ctx, store, etcdClient := testSetup(t, withRecorder())
268+
ctx, store, client := testSetup(t, withRecorder())
268269
validation := checkStorageCallsInvariants(
269-
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
270+
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
270271
storagetesting.RunTestListContinuation(ctx, t, store, validation)
271272
}
272273

273274
func TestListPaginationRareObject(t *testing.T) {
274-
ctx, store, etcdClient := testSetup(t, withRecorder())
275+
ctx, store, client := testSetup(t, withRecorder())
275276
validation := checkStorageCallsInvariants(
276-
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
277+
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
277278
storagetesting.RunTestListPaginationRareObject(ctx, t, store, validation)
278279
}
279280

280281
func TestListContinuationWithFilter(t *testing.T) {
281-
ctx, store, etcdClient := testSetup(t, withRecorder())
282+
ctx, store, client := testSetup(t, withRecorder())
282283
validation := checkStorageCallsInvariants(
283-
store.transformer.(*storagetesting.PrefixTransformer), etcdClient.KV.(*clientRecorder))
284+
store.transformer.(*storagetesting.PrefixTransformer), client.KV.(*clientRecorder))
284285
storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation)
285286
}
286287

@@ -299,7 +300,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
299300

300301
func TestListInconsistentContinuation(t *testing.T) {
301302
ctx, store, client := testSetup(t)
302-
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
303+
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client))
303304
}
304305

305306
func TestListResourceVersionMatch(t *testing.T) {
@@ -499,7 +500,7 @@ func (r *clientRecorder) GetReadsAndReset() uint64 {
499500
}
500501

501502
type setupOptions struct {
502-
client func(testing.TB) *clientv3.Client
503+
client func(testing.TB) *kubernetes.Client
503504
codec runtime.Codec
504505
newFunc func() runtime.Object
505506
newListFunc func() runtime.Object
@@ -516,7 +517,7 @@ type setupOption func(*setupOptions)
516517

517518
func withClientConfig(config *embed.Config) setupOption {
518519
return func(options *setupOptions) {
519-
options.client = func(t testing.TB) *clientv3.Client {
520+
options.client = func(t testing.TB) *kubernetes.Client {
520521
return testserver.RunEtcd(t, config)
521522
}
522523
}
@@ -541,7 +542,7 @@ func withRecorder() setupOption {
541542
}
542543

543544
func withDefaults(options *setupOptions) {
544-
options.client = func(t testing.TB) *clientv3.Client {
545+
options.client = func(t testing.TB) *kubernetes.Client {
545546
return testserver.RunEtcd(t, nil)
546547
}
547548
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
@@ -556,7 +557,7 @@ func withDefaults(options *setupOptions) {
556557

557558
var _ setupOption = withDefaults
558559

559-
func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *clientv3.Client) {
560+
func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kubernetes.Client) {
560561
setupOpts := setupOptions{}
561562
opts = append([]setupOption{withDefaults}, opts...)
562563
for _, opt := range opts {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ package testing
1919
import (
2020
"testing"
2121

22-
clientv3 "go.etcd.io/etcd/client/v3"
22+
"go.etcd.io/etcd/client/v3/kubernetes"
2323

2424
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
2525
"k8s.io/apiserver/pkg/storage/storagebackend"
2626
)
2727

2828
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
2929
type EtcdTestServer struct {
30-
V3Client *clientv3.Client
30+
V3Client *kubernetes.Client
3131
}
3232

3333
func (e *EtcdTestServer) Terminate(t testing.TB) {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
clientv3 "go.etcd.io/etcd/client/v3"
29+
"go.etcd.io/etcd/client/v3/kubernetes"
2930
"go.etcd.io/etcd/server/v3/embed"
3031
"go.uber.org/zap/zapcore"
3132
"go.uber.org/zap/zaptest"
@@ -81,7 +82,7 @@ func NewTestConfig(t testing.TB) *embed.Config {
8182
// RunEtcd starts an embedded etcd server with the provided config
8283
// (or NewTestConfig(t) if nil), and returns a client connected to the server.
8384
// The server is terminated when the test ends.
84-
func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
85+
func RunEtcd(t testing.TB, cfg *embed.Config) *kubernetes.Client {
8586
t.Helper()
8687

8788
if cfg == nil {
@@ -112,7 +113,7 @@ func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
112113
t.Fatal(err)
113114
}
114115

115-
client, err := clientv3.New(clientv3.Config{
116+
client, err := kubernetes.New(clientv3.Config{
116117
TLS: tlsConfig,
117118
Endpoints: e.Server.Cluster().ClientURLs(),
118119
DialTimeout: 10 * time.Second,

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
6464

6565
func TestWatchFromZero(t *testing.T) {
6666
ctx, store, client := testSetup(t)
67-
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client))
67+
storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client.Client))
6868
}
6969

7070
// TestWatchFromNonZero tests that

staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"go.etcd.io/etcd/client/pkg/v3/logutil"
3434
"go.etcd.io/etcd/client/pkg/v3/transport"
3535
clientv3 "go.etcd.io/etcd/client/v3"
36+
"go.etcd.io/etcd/client/v3/kubernetes"
3637
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3738
"go.uber.org/zap"
3839
"go.uber.org/zap/zapcore"
@@ -228,7 +229,7 @@ func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error)
228229
return nil, err
229230
}
230231
return &etcd3ProberMonitor{
231-
client: client,
232+
client: client.Client,
232233
prefix: c.Prefix,
233234
endpoints: c.Transport.ServerList,
234235
}, nil
@@ -282,7 +283,7 @@ func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetric
282283
}, nil
283284
}
284285

285-
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
286+
var newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
286287
tlsInfo := transport.TLSInfo{
287288
CertFile: c.CertFile,
288289
KeyFile: c.KeyFile,
@@ -352,7 +353,7 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
352353
Logger: etcd3ClientLogger,
353354
}
354355

355-
return clientv3.New(cfg)
356+
return kubernetes.New(cfg)
356357
}
357358

358359
type runningCompactor struct {
@@ -384,10 +385,11 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
384385
}
385386
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
386387
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
387-
compactorClient, err := newETCD3Client(c)
388+
client, err := newETCD3Client(c)
388389
if err != nil {
389390
return nil, err
390391
}
392+
compactorClient := client.Client
391393

392394
if foundBefore {
393395
// replace compactor
@@ -439,7 +441,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
439441
// decorate the KV instance so we can track etcd latency per request.
440442
client.KV = etcd3.NewETCDLatencyTracker(client.KV)
441443

442-
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
444+
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, c.DBMetricPollInterval)
443445
if err != nil {
444446
return nil, nil, err
445447
}

0 commit comments

Comments
 (0)