Skip to content

Commit a16a364

Browse files
committed
Migrate GetList to Kubernetes client
1 parent e192ac3 commit a16a364

File tree

1 file changed

+35
-32
lines changed
  • staging/src/k8s.io/apiserver/pkg/storage/etcd3

1 file changed

+35
-32
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto
629629

630630
// GetList implements storage.Interface.
631631
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
632-
preparedKey, err := s.prepareKey(key)
632+
keyPrefix, err := s.prepareKey(key)
633633
if err != nil {
634634
return err
635635
}
@@ -654,27 +654,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
654654
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
655655
// with prefix "/a" will return all three, while with prefix "/a/" will return only
656656
// "/a/b" which is the correct answer.
657-
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") {
658-
preparedKey += "/"
657+
if opts.Recursive && !strings.HasSuffix(keyPrefix, "/") {
658+
keyPrefix += "/"
659659
}
660-
keyPrefix := preparedKey
661660

662661
// set the appropriate clientv3 options to filter the returned data set
663-
var limitOption *clientv3.OpOption
664662
limit := opts.Predicate.Limit
665-
var paging bool
666-
options := make([]clientv3.OpOption, 0, 4)
667-
if opts.Predicate.Limit > 0 {
668-
paging = true
669-
options = append(options, clientv3.WithLimit(limit))
670-
limitOption = &options[len(options)-1]
671-
}
672-
673-
if opts.Recursive {
674-
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
675-
options = append(options, clientv3.WithRange(rangeEnd))
676-
}
677-
663+
paging := opts.Predicate.Limit > 0
678664
newItemFunc := getNewItemFunc(listObj, v)
679665

680666
var continueRV, withRev int64
@@ -684,20 +670,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
684670
if err != nil {
685671
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
686672
}
687-
preparedKey = continueKey
688673
}
689674
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
690675
return err
691676
}
692677

693-
if withRev != 0 {
694-
options = append(options, clientv3.WithRev(withRev))
695-
}
696-
697678
// loop until we have filled the requested limit from etcd or there are no more results
698679
var lastKey []byte
699680
var hasMore bool
700-
var getResp *clientv3.GetResponse
681+
var getResp kubernetes.ListResponse
701682
var numFetched int
702683
var numEvald int
703684
// Because these metrics are for understanding the costs of handling LIST requests,
@@ -714,24 +695,27 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
714695

715696
for {
716697
startTime := time.Now()
717-
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
698+
getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{
699+
Revision: withRev,
700+
Limit: limit,
701+
Continue: continueKey,
702+
})
718703
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
719704
if err != nil {
720705
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
721706
}
722707
numFetched += len(getResp.Kvs)
723-
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
708+
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil {
724709
return err
725710
}
726-
hasMore = getResp.More
711+
hasMore = int64(len(getResp.Kvs)) < getResp.Count
727712

728-
if len(getResp.Kvs) == 0 && getResp.More {
713+
if len(getResp.Kvs) == 0 && hasMore {
729714
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
730715
}
731716
// indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
732717
if withRev == 0 {
733-
withRev = getResp.Header.Revision
734-
options = append(options, clientv3.WithRev(withRev))
718+
withRev = getResp.Revision
735719
}
736720

737721
// avoid small allocations for the result slice, since this can be called in many
@@ -779,6 +763,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
779763
// free kv early. Long lists can take O(seconds) to decode.
780764
getResp.Kvs[i] = nil
781765
}
766+
continueKey = string(lastKey) + "\x00"
782767

783768
// no more results remain or we didn't request paging
784769
if !hasMore || !paging {
@@ -796,9 +781,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
796781
if limit > maxLimit {
797782
limit = maxLimit
798783
}
799-
*limitOption = clientv3.WithLimit(limit)
800784
}
801-
preparedKey = string(lastKey) + "\x00"
802785
}
803786

804787
if v.IsNil() {
@@ -813,6 +796,26 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
813796
return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount)
814797
}
815798

799+
func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (kubernetes.ListResponse, error) {
800+
if recursive {
801+
return s.client.Kubernetes.List(ctx, keyPrefix, options)
802+
}
803+
getResp, err := s.client.Kubernetes.Get(ctx, keyPrefix, kubernetes.GetOptions{
804+
Revision: options.Revision,
805+
})
806+
var resp kubernetes.ListResponse
807+
if getResp.KV != nil {
808+
resp.Kvs = []*mvccpb.KeyValue{getResp.KV}
809+
resp.Count = 1
810+
resp.Revision = getResp.Revision
811+
} else {
812+
resp.Kvs = []*mvccpb.KeyValue{}
813+
resp.Count = 0
814+
resp.Revision = getResp.Revision
815+
}
816+
return resp, err
817+
}
818+
816819
// growSlice takes a slice value and grows its capacity up
817820
// to the maximum of the passed sizes or maxCapacity, whichever
818821
// is smaller. Above maxCapacity decisions about allocation are left

0 commit comments

Comments
 (0)