Skip to content

Commit e131d98

Browse files
authored
Merge pull request #1696 from pwschuurman/list-volumes-instances-api
Use instances.list to determine PublishedNodeIds in ListVolume API
2 parents eead51b + 8874215 commit e131d98

File tree

10 files changed

+465
-40
lines changed

10 files changed

+465
-40
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ var (
7979
computeEndpoint *url.URL
8080
allowedComputeEnvironment = []gce.Environment{gce.EnvironmentStaging, gce.EnvironmentProduction}
8181

82-
useInstanceAPIOnWaitForAttachDiskTypesFlag = flag.String("use-instance-api-to-poll-attachment-disk-types", "", "Comma separated list of disk types that should use instances.get API when polling for disk attach during ControllerPublish")
82+
useInstanceAPIOnWaitForAttachDiskTypesFlag = flag.String("use-instance-api-to-poll-attachment-disk-types", "", "Comma separated list of disk types that should use instances.get API when polling for disk attach during ControllerPublish")
83+
useInstanceAPIForListVolumesPublishedNodesFlag = flag.Bool("use-instance-api-to-list-volumes-published-nodes", false, "Enables using the instances.list API to determine published_node_ids in ListVolumes. When false (default), the disks.list API is used")
84+
instancesListFiltersFlag = flag.String("instances-list-filters", "", "Comma separated list of filters to use when calling the instances.list API. By default instances.list fetches all instances in a region")
8385

8486
extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '<parent_id1>/<tag_key1>/<tag_value1>,...,<parent_idN>/<tag_keyN>/<tag_valueN>'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details")
8587

@@ -188,16 +190,25 @@ func handle() {
188190
UseInstancesAPIForDiskTypes: useInstanceAPIOnWaitForAttachDiskTypes,
189191
}
190192

193+
// Initialize listVolumes config
194+
instancesListFilters := strings.Split(*instancesListFiltersFlag, ",")
195+
listInstancesConfig := gce.ListInstancesConfig{
196+
Filters: instancesListFilters,
197+
}
198+
listVolumesConfig := driver.ListVolumesConfig{
199+
UseInstancesAPIForPublishedNodes: *useInstanceAPIForListVolumesPublishedNodesFlag,
200+
}
201+
191202
// Initialize requirements for the controller service
192203
var controllerServer *driver.GCEControllerServer
193204
if *runControllerService {
194-
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig)
205+
cloudProvider, err := gce.CreateCloudProvider(ctx, version, *cloudConfigFilePath, computeEndpoint, computeEnvironment, waitForAttachConfig, listInstancesConfig)
195206
if err != nil {
196207
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
197208
}
198209
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
199210
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
200-
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, multiZoneVolumeHandleConfig)
211+
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, multiZoneVolumeHandleConfig, listVolumesConfig)
201212
} else if *cloudConfigFilePath != "" {
202213
klog.Warningf("controller service is disabled but cloud config given - it has no effect")
203214
}

pkg/gce-cloud-provider/compute/fake-gce.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,22 @@ func (cloud *FakeCloudProvider) ListZones(ctx context.Context, region string) ([
127127
return []string{cloud.zone, "country-region-fakesecondzone"}, nil
128128
}
129129

130-
func (cloud *FakeCloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) {
130+
func (cloud *FakeCloudProvider) ListDisks(ctx context.Context, fields []googleapi.Field) ([]*computev1.Disk, string, error) {
131131
d := []*computev1.Disk{}
132132
for _, cd := range cloud.disks {
133133
d = append(d, cd.disk)
134134
}
135135
return d, "", nil
136136
}
137137

138+
func (cloud *FakeCloudProvider) ListInstances(ctx context.Context, fields []googleapi.Field) ([]*computev1.Instance, string, error) {
139+
instances := []*computev1.Instance{}
140+
for _, instance := range cloud.instances {
141+
instances = append(instances, instance)
142+
}
143+
return instances, "", nil
144+
}
145+
138146
func (cloud *FakeCloudProvider) ListSnapshots(ctx context.Context, filter string) ([]*computev1.Snapshot, string, error) {
139147
var sourceDisk string
140148
snapshots := []*computev1.Snapshot{}

pkg/gce-cloud-provider/compute/gce-compute.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"golang.org/x/oauth2"
3434
computebeta "google.golang.org/api/compute/v0.beta"
3535
computev1 "google.golang.org/api/compute/v1"
36+
"google.golang.org/api/googleapi"
3637
"google.golang.org/api/iterator"
3738
"google.golang.org/grpc/codes"
3839
"google.golang.org/grpc/status"
@@ -106,7 +107,8 @@ type GCECompute interface {
106107
GetDiskTypeURI(project string, volKey *meta.Key, diskType string) string
107108
WaitForAttach(ctx context.Context, project string, volKey *meta.Key, diskType, instanceZone, instanceName string) error
108109
ResizeDisk(ctx context.Context, project string, volKey *meta.Key, requestBytes int64) (int64, error)
109-
ListDisks(ctx context.Context) ([]*computev1.Disk, string, error)
110+
ListDisks(ctx context.Context, fields []googleapi.Field) ([]*computev1.Disk, string, error)
111+
ListInstances(ctx context.Context, fields []googleapi.Field) ([]*computev1.Instance, string, error)
110112
// Regional Disk Methods
111113
GetReplicaZoneURI(project string, zone string) string
112114
// Instance Methods
@@ -135,7 +137,7 @@ func (cloud *CloudProvider) GetDefaultZone() string {
135137

136138
// ListDisks lists disks based on maxEntries and pageToken only in the project
137139
// and region that the driver is running in.
138-
func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, string, error) {
140+
func (cloud *CloudProvider) ListDisks(ctx context.Context, fields []googleapi.Field) ([]*computev1.Disk, string, error) {
139141
region, err := common.GetRegionFromZones([]string{cloud.zone})
140142
if err != nil {
141143
return nil, "", fmt.Errorf("failed to get region from zones: %w", err)
@@ -148,6 +150,7 @@ func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, s
148150

149151
// listing out regional disks in the region
150152
rlCall := cloud.service.RegionDisks.List(cloud.project, region)
153+
rlCall.Fields(fields...)
151154
nextPageToken := "pageToken"
152155
for nextPageToken != "" {
153156
rDiskList, err := rlCall.Do()
@@ -162,6 +165,7 @@ func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, s
162165
// listing out zonal disks in all zones of the region
163166
for _, zone := range zones {
164167
lCall := cloud.service.Disks.List(cloud.project, zone)
168+
lCall.Fields(fields...)
165169
nextPageToken := "pageToken"
166170
for nextPageToken != "" {
167171
diskList, err := lCall.Do()
@@ -176,6 +180,41 @@ func (cloud *CloudProvider) ListDisks(ctx context.Context) ([]*computev1.Disk, s
176180
return items, "", nil
177181
}
178182

183+
// ListInstances lists instances based on maxEntries and pageToken for the project and region
184+
// that the driver is running in. Filters from cloud.listInstancesConfig.Filters are applied
185+
// to the request.
186+
func (cloud *CloudProvider) ListInstances(ctx context.Context, fields []googleapi.Field) ([]*computev1.Instance, string, error) {
187+
region, err := common.GetRegionFromZones([]string{cloud.zone})
188+
if err != nil {
189+
return nil, "", fmt.Errorf("failed to get region from zones: %w", err)
190+
}
191+
zones, err := cloud.ListZones(ctx, region)
192+
if err != nil {
193+
return nil, "", err
194+
}
195+
items := []*computev1.Instance{}
196+
197+
for _, zone := range zones {
198+
lCall := cloud.service.Instances.List(cloud.project, zone)
199+
for _, filter := range cloud.listInstancesConfig.Filters {
200+
lCall = lCall.Filter(filter)
201+
}
202+
lCall = lCall.Fields(fields...)
203+
nextPageToken := "pageToken"
204+
for nextPageToken != "" {
205+
instancesList, err := lCall.Do()
206+
if err != nil {
207+
return nil, "", err
208+
}
209+
items = append(items, instancesList.Items...)
210+
nextPageToken = instancesList.NextPageToken
211+
lCall.PageToken(nextPageToken)
212+
}
213+
}
214+
215+
return items, "", nil
216+
}
217+
179218
// RepairUnderspecifiedVolumeKey will query the cloud provider and check each zone for the disk specified
180219
// by the volume key and return a volume key with a correct zone
181220
func (cloud *CloudProvider) RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error) {

pkg/gce-cloud-provider/compute/gce.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ type CloudProvider struct {
102102
waitForAttachConfig WaitForAttachConfig
103103

104104
tagsRateLimiter *rate.Limiter
105+
106+
listInstancesConfig ListInstancesConfig
105107
}
106108

107109
var _ GCECompute = &CloudProvider{}
@@ -110,6 +112,10 @@ type ConfigFile struct {
110112
Global ConfigGlobal `gcfg:"global"`
111113
}
112114

115+
type ListInstancesConfig struct {
116+
Filters []string
117+
}
118+
113119
type WaitForAttachConfig struct {
114120
// A set of disk types that should use the compute instances.get API instead of the
115121
// disks.get API. For certain disk types, using the instances.get API is preferred
@@ -128,7 +134,7 @@ type ConfigGlobal struct {
128134
Zone string `gcfg:"zone"`
129135
}
130136

131-
func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath string, computeEndpoint *url.URL, computeEnvironment Environment, waitForAttachConfig WaitForAttachConfig) (*CloudProvider, error) {
137+
func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath string, computeEndpoint *url.URL, computeEnvironment Environment, waitForAttachConfig WaitForAttachConfig, listInstancesConfig ListInstancesConfig) (*CloudProvider, error) {
132138
configFile, err := readConfig(configPath)
133139
if err != nil {
134140
return nil, err
@@ -168,6 +174,7 @@ func CreateCloudProvider(ctx context.Context, vendorVersion string, configPath s
168174
zone: zone,
169175
zonesCache: make(map[string]([]string)),
170176
waitForAttachConfig: waitForAttachConfig,
177+
listInstancesConfig: listInstancesConfig,
171178
// GCP has a rate limit of 600 requests per minute, restricting
172179
// here to 8 requests per second.
173180
tagsRateLimiter: common.NewLimiter(gcpTagsRequestRateLimit, gcpTagsRequestTokenBucketSize, true),

pkg/gce-pd-csi-driver/controller.go

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
2929
csi "github.com/container-storage-interface/spec/lib/go/csi"
3030
compute "google.golang.org/api/compute/v1"
31+
"google.golang.org/api/googleapi"
3132
"google.golang.org/grpc/codes"
3233
"google.golang.org/grpc/status"
3334
"google.golang.org/protobuf/types/known/timestamppb"
@@ -105,6 +106,8 @@ type GCEControllerServer struct {
105106
enableStoragePools bool
106107

107108
multiZoneVolumeHandleConfig MultiZoneVolumeHandleConfig
109+
110+
listVolumesConfig ListVolumesConfig
108111
}
109112

110113
type MultiZoneVolumeHandleConfig struct {
@@ -124,6 +127,21 @@ type MultiZoneVolumeHandleConfig struct {
124127
Enable bool
125128
}
126129

130+
type ListVolumesConfig struct {
131+
UseInstancesAPIForPublishedNodes bool
132+
}
133+
134+
func (c ListVolumesConfig) listDisksFields() []googleapi.Field {
135+
if c.UseInstancesAPIForPublishedNodes {
136+
// If we are using the instances.list API in ListVolumes,
137+
// don't include the users field in the response, as an optimization.
138+
// We rely on instances.list items.disks for attachment pairings.
139+
return listDisksFieldsWithoutUsers
140+
}
141+
142+
return listDisksFieldsWithUsers
143+
}
144+
127145
type csiErrorBackoffId string
128146

129147
type csiErrorBackoff struct {
@@ -175,10 +193,28 @@ const (
175193
resourceApiScheme = "https"
176194
resourceApiService = "compute"
177195
resourceProject = "projects"
196+
197+
listDisksUsersField = googleapi.Field("items/users")
178198
)
179199

180200
var (
181201
validResourceApiVersions = map[string]bool{"v1": true, "alpha": true, "beta": true, "staging_v1": true, "staging_beta": true, "staging_alpha": true}
202+
203+
// By default GCE returns a lot of data for each instance. Request only a subset of the fields.
204+
listInstancesFields = []googleapi.Field{
205+
"items/disks/deviceName",
206+
"items/disks/source",
207+
"items/selfLink",
208+
"nextPageToken",
209+
}
210+
211+
// By default GCE returns a lot of data for each disk. Request only a subset of the fields.
212+
listDisksFieldsWithoutUsers = []googleapi.Field{
213+
"items/labels",
214+
"items/selfLink",
215+
"nextPageToken",
216+
}
217+
listDisksFieldsWithUsers = append(listDisksFieldsWithoutUsers, "items/users")
182218
)
183219

184220
func isDiskReady(disk *gce.CloudDisk) (bool, error) {
@@ -935,6 +971,22 @@ func generateFailedValidationMessage(format string, a ...interface{}) *csi.Valid
935971
}
936972
}
937973

974+
func (gceCS *GCEControllerServer) listVolumeEntries(ctx context.Context) ([]*csi.ListVolumesResponse_Entry, error) {
975+
diskList, _, err := gceCS.CloudProvider.ListDisks(ctx, gceCS.listVolumesConfig.listDisksFields())
976+
if err != nil {
977+
return nil, err
978+
}
979+
980+
var instanceList []*compute.Instance = nil
981+
if gceCS.listVolumesConfig.UseInstancesAPIForPublishedNodes {
982+
instanceList, _, err = gceCS.CloudProvider.ListInstances(ctx, listInstancesFields)
983+
if err != nil {
984+
return nil, err
985+
}
986+
}
987+
return gceCS.disksAndInstancesToVolumeEntries(diskList, instanceList), nil
988+
}
989+
938990
func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
939991
// https://cloud.google.com/compute/docs/reference/beta/disks/list
940992
if req.MaxEntries < 0 {
@@ -944,19 +996,15 @@ func (gceCS *GCEControllerServer) ListVolumes(ctx context.Context, req *csi.List
944996

945997
offsetLow := 0
946998
var ok bool
947-
var volumeEntries []*csi.ListVolumesResponse_Entry
948999
if req.StartingToken == "" {
949-
diskList, _, err := gceCS.CloudProvider.ListDisks(ctx)
1000+
volumeEntries, err := gceCS.listVolumeEntries(ctx)
9501001
if err != nil {
9511002
if gce.IsGCEInvalidError(err) {
9521003
return nil, status.Errorf(codes.Aborted, "ListVolumes error with invalid request: %v", err.Error())
9531004
}
954-
return nil, common.LoggedError("Failed to list disk: ", err)
1005+
return nil, common.LoggedError("Failed to list volumes: ", err)
9551006
}
956-
volumeEntries = gceCS.disksToVolumeEntries(diskList)
957-
}
9581007

959-
if req.StartingToken == "" {
9601008
gceCS.volumeEntries = volumeEntries
9611009
gceCS.volumeEntriesSeen = map[string]int{}
9621010
} else {
@@ -1008,47 +1056,62 @@ func isMultiZoneDisk(diskRsrc string, diskLabels map[string]string) (string, boo
10081056
return multiZoneVolumeId, true
10091057
}
10101058

1011-
// disksToVolumeEntries converts a list of disks to a list of CSI ListVolumeResponse entries
1059+
// disksAndInstancesToVolumeEntries converts a list of disks and instances to a list
1060+
// of CSI ListVolumeResponse entries.
10121061
// It appends "multi-zone" volumeHandles at the end. These are volumeHandles which
10131062
// map to multiple volumeHandles in different zones
1014-
func (gceCS *GCEControllerServer) disksToVolumeEntries(disks []*compute.Disk) []*csi.ListVolumesResponse_Entry {
1015-
multiZoneNodesByVolumeId := map[string][]string{}
1016-
entries := []*csi.ListVolumesResponse_Entry{}
1063+
func (gceCS *GCEControllerServer) disksAndInstancesToVolumeEntries(disks []*compute.Disk, instances []*compute.Instance) []*csi.ListVolumesResponse_Entry {
1064+
nodesByVolumeId := map[string][]string{}
1065+
multiZoneVolumeIdsByVolumeId := map[string]string{}
10171066
for _, d := range disks {
1018-
diskRsrc, err := getResourceId(d.SelfLink)
1067+
volumeId, err := getResourceId(d.SelfLink)
10191068
if err != nil {
10201069
klog.Warningf("Bad ListVolumes disk resource %s, skipped: %v (%+v)", d.SelfLink, err, d)
10211070
continue
10221071
}
1023-
users := []string{}
1072+
1073+
instanceIds := make([]string, len(d.Users))
10241074
for _, u := range d.Users {
1025-
rsrc, err := getResourceId(u)
1075+
instanceId, err := getResourceId(u)
10261076
if err != nil {
10271077
klog.Warningf("Bad ListVolumes user %s, skipped: %v", u, err)
10281078
} else {
1029-
users = append(users, rsrc)
1079+
instanceIds = append(instanceIds, instanceId)
10301080
}
10311081
}
10321082

1083+
nodesByVolumeId[volumeId] = instanceIds
1084+
10331085
if gceCS.multiZoneVolumeHandleConfig.Enable {
1034-
if multiZoneVolumeId, isMultiZone := isMultiZoneDisk(diskRsrc, d.Labels); isMultiZone {
1035-
_, ok := multiZoneNodesByVolumeId[multiZoneVolumeId]
1036-
if !ok {
1037-
multiZoneNodesByVolumeId[multiZoneVolumeId] = []string{}
1038-
}
1039-
multiZoneNodesByVolumeId[multiZoneVolumeId] = append(multiZoneNodesByVolumeId[multiZoneVolumeId], users...)
1086+
if multiZoneVolumeId, isMultiZone := isMultiZoneDisk(volumeId, d.Labels); isMultiZone {
1087+
multiZoneVolumeIdsByVolumeId[volumeId] = multiZoneVolumeId
1088+
nodesByVolumeId[multiZoneVolumeId] = append(nodesByVolumeId[multiZoneVolumeId], instanceIds...)
10401089
}
10411090
}
1042-
entries = append(entries, &csi.ListVolumesResponse_Entry{
1043-
Volume: &csi.Volume{
1044-
VolumeId: diskRsrc,
1045-
},
1046-
Status: &csi.ListVolumesResponse_VolumeStatus{
1047-
PublishedNodeIds: users,
1048-
},
1049-
})
10501091
}
1051-
for volumeId, nodeIds := range multiZoneNodesByVolumeId {
1092+
1093+
entries := []*csi.ListVolumesResponse_Entry{}
1094+
for _, instance := range instances {
1095+
instanceId, err := getResourceId(instance.SelfLink)
1096+
if err != nil {
1097+
klog.Warningf("Bad ListVolumes instance resource %s, skipped: %v (%+v)", instance.SelfLink, err, instance)
1098+
continue
1099+
}
1100+
for _, disk := range instance.Disks {
1101+
volumeId, err := getResourceId(disk.Source)
1102+
if err != nil {
1103+
klog.Warningf("Bad ListVolumes instance disk source %s, skipped: %v (%+v)", disk.Source, err, instance)
1104+
continue
1105+
}
1106+
1107+
nodesByVolumeId[volumeId] = append(nodesByVolumeId[volumeId], instanceId)
1108+
if multiZoneVolumeId, isMultiZone := multiZoneVolumeIdsByVolumeId[volumeId]; isMultiZone {
1109+
nodesByVolumeId[multiZoneVolumeId] = append(nodesByVolumeId[multiZoneVolumeId], instanceId)
1110+
}
1111+
}
1112+
}
1113+
1114+
for volumeId, nodeIds := range nodesByVolumeId {
10521115
entries = append(entries, &csi.ListVolumesResponse_Entry{
10531116
Volume: &csi.Volume{
10541117
VolumeId: volumeId,

0 commit comments

Comments
 (0)