Skip to content

Commit c89e3db

Browse files
kayruskon-angelo
andauthored
[cinder-csi-plugin] Refactor list volumes (#2766) (#2842)
* refactor list volumes call * upgrade tests * comments improvements * fix imports and list options * token split * add more jointoken tests Co-authored-by: Konstantinos Angelopoulos <[email protected]>
1 parent 3776a0b commit c89e3db

File tree

5 files changed

+251
-797
lines changed

5 files changed

+251
-797
lines changed

pkg/csi/cinder/controllerserver.go

Lines changed: 40 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package cinder
1818

1919
import (
2020
"context"
21-
"encoding/json"
2221
"fmt"
22+
"slices"
2323
"sort"
2424
"strconv"
2525

@@ -118,7 +118,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
118118

119119
// Volume Create
120120
properties := map[string]string{cinderCSIClusterIDKey: cs.Driver.cluster}
121-
//Tag volume with metadata if present: https://github.com/kubernetes-csi/external-provisioner/pull/399
121+
// Tag volume with metadata if present: https://github.com/kubernetes-csi/external-provisioner/pull/399
122122
for _, mKey := range []string{"csi.storage.k8s.io/pvc/name", "csi.storage.k8s.io/pvc/namespace", "csi.storage.k8s.io/pv/name"} {
123123
if v, ok := req.Parameters[mKey]; ok {
124124
properties[mKey] = v
@@ -153,7 +153,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
153153
var back *backups.Backup
154154
back, err = cloud.GetBackupByID(snapshotID)
155155
if err != nil {
156-
//If there is an error getting the backup as well, fail.
156+
// If there is an error getting the backup as well, fail.
157157
return nil, status.Errorf(codes.NotFound, "VolumeContentSource Snapshot or Backup with ID %s not found", snapshotID)
158158
}
159159
if back.Status != "available" {
@@ -355,12 +355,6 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
355355
return &csi.ControllerUnpublishVolumeResponse{}, nil
356356
}
357357

358-
type CloudsStartingToken struct {
359-
CloudName string `json:"cloud"`
360-
Token string `json:"token"`
361-
isEmpty bool
362-
}
363-
364358
func (cs *controllerServer) extractNodeIDs(attachments []volumes.Attachment) []string {
365359
nodeIDs := make([]string, len(attachments))
366360
for i, attachment := range attachments {
@@ -392,123 +386,56 @@ func (cs *controllerServer) ListVolumes(ctx context.Context, req *csi.ListVolume
392386
return nil, status.Errorf(codes.InvalidArgument, "[ListVolumes] Invalid max entries request %v, must not be negative ", req.MaxEntries)
393387
}
394388
maxEntries := int(req.MaxEntries)
395-
396389
var err error
397-
var cloudsToken = CloudsStartingToken{
398-
CloudName: "",
399-
Token: "",
400-
isEmpty: len(req.StartingToken) == 0,
401-
}
402390

403391
cloudsNames := maps.Keys(cs.Clouds)
404392
sort.Strings(cloudsNames)
405393

406-
currentCloudName := cloudsNames[0]
394+
var (
395+
token string
396+
idx int
397+
cloudName = cloudsNames[0]
398+
)
407399
if req.StartingToken != "" {
408-
err = json.Unmarshal([]byte(req.StartingToken), &cloudsToken)
409-
if err != nil {
410-
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: Token invalid")
411-
}
412-
currentCloudName = cloudsToken.CloudName
413-
}
414-
415-
startingToken := cloudsToken.Token
416-
var cloudsVentries []*csi.ListVolumesResponse_Entry
417-
var vlist []volumes.Volume
418-
var nextPageToken string
419-
420-
if !cloudsToken.isEmpty && startingToken == "" {
421-
// previous call ended on last volumes from "currentCloudName" we should pass to next one
422-
for i := range cloudsNames {
423-
if cloudsNames[i] == currentCloudName {
424-
currentCloudName = cloudsNames[i+1]
425-
break
426-
}
400+
token, cloudName = splitToken(req.StartingToken)
401+
idx = slices.Index(cloudsNames, cloudName)
402+
if idx < 0 {
403+
return nil, status.Errorf(codes.Internal, "[ListVolumes] Invalid request: %s", fmt.Errorf("unknown cloud specified in the request: %v", cloudName))
427404
}
428405
}
429406

430-
startIdx := 0
431-
for _, cloudName := range cloudsNames {
432-
if cloudName == currentCloudName {
433-
break
434-
}
435-
startIdx++
436-
}
437-
for idx := startIdx; idx < len(cloudsNames); idx++ {
438-
if maxEntries > 0 {
439-
vlist, nextPageToken, err = cs.Clouds[cloudsNames[idx]].ListVolumes(maxEntries-len(cloudsVentries), startingToken)
440-
} else {
441-
vlist, nextPageToken, err = cs.Clouds[cloudsNames[idx]].ListVolumes(maxEntries, startingToken)
442-
}
443-
startingToken = nextPageToken
444-
if err != nil {
445-
klog.Errorf("Failed to ListVolumes: %v", err)
446-
if cpoerrors.IsInvalidError(err) {
447-
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
448-
}
449-
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
450-
}
451-
452-
ventries := cs.createVolumeEntries(vlist)
453-
klog.V(4).Infof("ListVolumes: retrieved %d entries and %q next token from cloud %q", len(ventries), nextPageToken, cloudsNames[idx])
454-
455-
cloudsVentries = append(cloudsVentries, ventries...)
456-
457-
// Reach maxEntries setup nextToken with cloud identifier if needed
458-
sendEmptyToken := false
459-
if maxEntries > 0 && len(cloudsVentries) == maxEntries {
460-
if nextPageToken == "" {
461-
if idx+1 == len(cloudsNames) {
462-
// no more entries and no more clouds
463-
// send no token its finished
464-
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), "")
465-
return &csi.ListVolumesResponse{
466-
Entries: cloudsVentries,
467-
NextToken: "",
468-
}, nil
469-
} else {
470-
// still clouds to process
471-
// set token to next non empty cloud
472-
i := 0
473-
for i = idx + 1; i < len(cloudsNames); i++ {
474-
vlistTmp, _, err := cs.Clouds[cloudsNames[i]].ListVolumes(1, "")
475-
if err != nil {
476-
klog.Errorf("Failed to ListVolumes: %v", err)
477-
if cpoerrors.IsInvalidError(err) {
478-
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
479-
}
480-
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
481-
}
482-
if len(vlistTmp) > 0 {
483-
cloudsToken.CloudName = cloudsNames[i]
484-
cloudsToken.isEmpty = false
485-
break
486-
}
487-
}
488-
if i == len(cloudsNames) {
489-
sendEmptyToken = true
490-
}
491-
}
492-
}
493-
cloudsToken.CloudName = cloudsNames[idx]
494-
cloudsToken.Token = nextPageToken
495-
var data []byte
496-
data, _ = json.Marshal(cloudsToken)
497-
if sendEmptyToken {
498-
data = []byte("")
499-
}
500-
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), string(data))
501-
return &csi.ListVolumesResponse{
502-
Entries: cloudsVentries,
503-
NextToken: string(data),
504-
}, nil
407+
var volumeList []volumes.Volume
408+
volumeList, token, err = cs.Clouds[cloudName].ListVolumes(maxEntries, token)
409+
if err != nil {
410+
klog.Errorf("Failed to ListVolumes: %v", err)
411+
if cpoerrors.IsInvalidError(err) {
412+
return nil, status.Errorf(codes.Aborted, "[ListVolumes] Invalid request: %v", err)
505413
}
414+
return nil, status.Errorf(codes.Internal, "ListVolumes failed with error %v", err)
415+
}
416+
volumeEntries := cs.createVolumeEntries(volumeList)
417+
klog.V(4).Infof("ListVolumes: retrieved %d entries and %q next token from cloud %q", len(volumeEntries), token, cloudName)
418+
419+
switch {
420+
// if we have not finished listing all volumes from this cloud, we will continue on next call.
421+
case token != "":
422+
// if we listed all volumes from this cloud but more clouds exist, return a token of the next cloud.
423+
case idx+1 < len(cloudsNames):
424+
cloudName = cloudsNames[idx+1]
425+
default:
426+
// work is done.
427+
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(volumeEntries), "")
428+
return &csi.ListVolumesResponse{
429+
Entries: volumeEntries,
430+
NextToken: "",
431+
}, nil
506432
}
507433

508-
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(cloudsVentries), "")
434+
nextToken := joinToken(token, cloudName)
435+
klog.V(4).Infof("ListVolumes: completed with %d entries and %q next token", len(volumeEntries), nextToken)
509436
return &csi.ListVolumesResponse{
510-
Entries: cloudsVentries,
511-
NextToken: "",
437+
Entries: volumeEntries,
438+
NextToken: nextToken,
512439
}, nil
513440
}
514441

0 commit comments

Comments
 (0)