@@ -19,13 +19,13 @@ package etcd3
19
19
import (
20
20
"bytes"
21
21
"context"
22
- "errors"
23
22
"fmt"
24
23
"path"
25
24
"reflect"
26
25
"strings"
27
26
"time"
28
27
28
+ "go.etcd.io/etcd/api/v3/mvccpb"
29
29
clientv3 "go.etcd.io/etcd/client/v3"
30
30
"go.etcd.io/etcd/client/v3/kubernetes"
31
31
"go.opentelemetry.io/otel/attribute"
@@ -347,37 +347,25 @@ func (s *store) conditionalDelete(
347
347
}
348
348
349
349
startTime := time .Now ()
350
- txnResp , err := s .client .KV .Txn (ctx ).If (
351
- clientv3 .Compare (clientv3 .ModRevision (key ), "=" , origState .rev ),
352
- ).Then (
353
- clientv3 .OpDelete (key ),
354
- ).Else (
355
- clientv3 .OpGet (key ),
356
- ).Commit ()
350
+ txnResp , err := s .client .Kubernetes .OptimisticDelete (ctx , key , origState .rev , kubernetes.DeleteOptions {
351
+ GetOnFailure : true ,
352
+ })
357
353
metrics .RecordEtcdRequest ("delete" , s .groupResourceString , err , startTime )
358
354
if err != nil {
359
355
return err
360
356
}
361
357
if ! txnResp .Succeeded {
362
- getResp := (* clientv3 .GetResponse )(txnResp .Responses [0 ].GetResponseRange ())
363
358
klog .V (4 ).Infof ("deletion of %s failed because of a conflict, going to retry" , key )
364
- origState , err = s .getState (ctx , getResp , key , v , false , skipTransformDecode )
359
+ origState , err = s .getState (ctx , txnResp . KV , key , v , false , skipTransformDecode )
365
360
if err != nil {
366
361
return err
367
362
}
368
363
origStateIsCurrent = true
369
364
continue
370
365
}
371
366
372
- if len (txnResp .Responses ) == 0 || txnResp .Responses [0 ].GetResponseDeleteRange () == nil {
373
- return errors .New (fmt .Sprintf ("invalid DeleteRange response: %v" , txnResp .Responses ))
374
- }
375
- deleteResp := txnResp .Responses [0 ].GetResponseDeleteRange ()
376
- if deleteResp .Header == nil {
377
- return errors .New ("invalid DeleteRange response - nil header" )
378
- }
379
367
if ! skipTransformDecode {
380
- err = s .decoder .Decode (origState .data , out , deleteResp . Header .Revision )
368
+ err = s .decoder .Decode (origState .data , out , txnResp .Revision )
381
369
if err != nil {
382
370
recordDecodeError (s .groupResourceString , key )
383
371
return err
@@ -510,20 +498,21 @@ func (s *store) GuaranteedUpdate(
510
498
}
511
499
span .AddEvent ("TransformToStorage succeeded" )
512
500
513
- opts , err := s .ttlOpts (ctx , int64 (ttl ))
514
- if err != nil {
515
- return err
501
+ var lease clientv3.LeaseID
502
+ if ttl != 0 {
503
+ lease , err = s .leaseManager .GetLease (ctx , int64 (ttl ))
504
+ if err != nil {
505
+ return err
506
+ }
516
507
}
517
508
span .AddEvent ("Transaction prepared" )
518
509
519
510
startTime := time .Now ()
520
- txnResp , err := s .client .KV .Txn (ctx ).If (
521
- clientv3 .Compare (clientv3 .ModRevision (preparedKey ), "=" , origState .rev ),
522
- ).Then (
523
- clientv3 .OpPut (preparedKey , string (newData ), opts ... ),
524
- ).Else (
525
- clientv3 .OpGet (preparedKey ),
526
- ).Commit ()
511
+
512
+ txnResp , err := s .client .Kubernetes .OptimisticPut (ctx , preparedKey , newData , origState .rev , kubernetes.PutOptions {
513
+ GetOnFailure : true ,
514
+ LeaseID : lease ,
515
+ })
527
516
metrics .RecordEtcdRequest ("update" , s .groupResourceString , err , startTime )
528
517
if err != nil {
529
518
span .AddEvent ("Txn call failed" , attribute .String ("err" , err .Error ()))
@@ -532,20 +521,17 @@ func (s *store) GuaranteedUpdate(
532
521
span .AddEvent ("Txn call completed" )
533
522
span .AddEvent ("Transaction committed" )
534
523
if ! txnResp .Succeeded {
535
- getResp := (* clientv3 .GetResponse )(txnResp .Responses [0 ].GetResponseRange ())
536
524
klog .V (4 ).Infof ("GuaranteedUpdate of %s failed because of a conflict, going to retry" , preparedKey )
537
- skipTransformDecode := false
538
- origState , err = s .getState (ctx , getResp , preparedKey , v , ignoreNotFound , skipTransformDecode )
525
+ origState , err = s .getState (ctx , txnResp .KV , preparedKey , v , ignoreNotFound , skipTransformDecode )
539
526
if err != nil {
540
527
return err
541
528
}
542
529
span .AddEvent ("Retry value restored" )
543
530
origStateIsCurrent = true
544
531
continue
545
532
}
546
- putResp := txnResp .Responses [0 ].GetResponsePut ()
547
533
548
- err = s .decoder .Decode (data , destination , putResp . Header .Revision )
534
+ err = s .decoder .Decode (data , destination , txnResp .Revision )
549
535
if err != nil {
550
536
span .AddEvent ("decode failed" , attribute .Int ("len" , len (data )), attribute .String ("err" , err .Error ()))
551
537
recordDecodeError (s .groupResourceString , preparedKey )
@@ -885,12 +871,12 @@ func (s *store) watchContext(ctx context.Context) context.Context {
885
871
func (s * store ) getCurrentState (ctx context.Context , key string , v reflect.Value , ignoreNotFound bool , skipTransformDecode bool ) func () (* objState , error ) {
886
872
return func () (* objState , error ) {
887
873
startTime := time .Now ()
888
- getResp , err := s .client .KV .Get (ctx , key )
874
+ getResp , err := s .client .Kubernetes .Get (ctx , key , kubernetes. GetOptions {} )
889
875
metrics .RecordEtcdRequest ("get" , s .groupResourceString , err , startTime )
890
876
if err != nil {
891
877
return nil , err
892
878
}
893
- return s .getState (ctx , getResp , key , v , ignoreNotFound , skipTransformDecode )
879
+ return s .getState (ctx , getResp . KV , key , v , ignoreNotFound , skipTransformDecode )
894
880
}
895
881
}
896
882
@@ -900,7 +886,7 @@ func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value
900
886
// storage will be transformed and decoded.
901
887
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
902
888
// of the objState will be nil, and 'stale' will be set to true.
903
- func (s * store ) getState (ctx context.Context , getResp * clientv3. GetResponse , key string , v reflect.Value , ignoreNotFound bool , skipTransformDecode bool ) (* objState , error ) {
889
+ func (s * store ) getState (ctx context.Context , kv * mvccpb. KeyValue , key string , v reflect.Value , ignoreNotFound bool , skipTransformDecode bool ) (* objState , error ) {
904
890
state := & objState {
905
891
meta : & storage.ResponseMeta {},
906
892
}
@@ -911,15 +897,15 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
911
897
state .obj = reflect .New (v .Type ()).Interface ().(runtime.Object )
912
898
}
913
899
914
- if len ( getResp . Kvs ) == 0 {
900
+ if kv == nil {
915
901
if ! ignoreNotFound {
916
902
return nil , storage .NewKeyNotFoundError (key , 0 )
917
903
}
918
904
if err := runtime .SetZeroValue (state .obj ); err != nil {
919
905
return nil , err
920
906
}
921
907
} else {
922
- state .rev = getResp . Kvs [ 0 ] .ModRevision
908
+ state .rev = kv .ModRevision
923
909
state .meta .ResourceVersion = uint64 (state .rev )
924
910
925
911
if skipTransformDecode {
@@ -929,7 +915,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
929
915
return state , nil
930
916
}
931
917
932
- data , stale , err := s .transformer .TransformFromStorage (ctx , getResp . Kvs [ 0 ] .Value , authenticatedDataString (key ))
918
+ data , stale , err := s .transformer .TransformFromStorage (ctx , kv .Value , authenticatedDataString (key ))
933
919
if err != nil {
934
920
return nil , storage .NewInternalError (err )
935
921
}
@@ -989,19 +975,6 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
989
975
return ret , ttl , nil
990
976
}
991
977
992
- // ttlOpts returns client options based on given ttl.
993
- // ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
994
- func (s * store ) ttlOpts (ctx context.Context , ttl int64 ) ([]clientv3.OpOption , error ) {
995
- if ttl == 0 {
996
- return nil , nil
997
- }
998
- id , err := s .leaseManager .GetLease (ctx , ttl )
999
- if err != nil {
1000
- return nil , err
1001
- }
1002
- return []clientv3.OpOption {clientv3 .WithLease (id )}, nil
1003
- }
1004
-
1005
978
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
1006
979
// greater than the most recent actualRevision available from storage.
1007
980
func (s * store ) validateMinimumResourceVersion (minimumResourceVersion string , actualRevision uint64 ) error {
0 commit comments