Skip to content

Commit 0ab0dad

Browse files
committed
Implementation with unit tests
1 parent 1b15343 commit 0ab0dad

File tree

290 files changed

+17254
-68
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

290 files changed

+17254
-68
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/rest"
3032
"k8s.io/klog/v2"
3133
"k8s.io/utils/strings/slices"
3234
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -94,6 +96,8 @@ var (
9496

9597
extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '<parent_id1>/<tag_key1>/<tag_value1>,...,<parent_idN>/<tag_keyN>/<tag_valueN>'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details")
9698

99+
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a topology.gke.io/[disk-type] topology label to the Topologies returned in CreateVolumeResponse")
100+
97101
version string
98102
)
99103

@@ -227,7 +231,10 @@ func handle() {
227231
}
228232
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
229233
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
230-
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag)
234+
args := &driver.GCEControllerServerArgs{
235+
EnableDiskTopology: *diskTopology,
236+
}
237+
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args)
231238
} else if *cloudConfigFilePath != "" {
232239
klog.Warningf("controller service is disabled but cloud config given - it has no effect")
233240
}
@@ -239,6 +246,7 @@ func handle() {
239246
if err != nil {
240247
klog.Fatalf("Failed to get safe mounter: %v", err.Error())
241248
}
249+
242250
deviceUtils := deviceutils.NewDeviceUtils()
243251
statter := mountmanager.NewStatter(mounter)
244252
meta, err := metadataservice.NewMetadataService()
@@ -249,13 +257,26 @@ func handle() {
249257
if err != nil {
250258
klog.Fatalf("Failed to get node info from API server: %v", err.Error())
251259
}
252-
nsArgs := driver.NodeServerArgs{
260+
261+
nsArgs := &driver.NodeServerArgs{
253262
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
254263
DeviceInUseTimeout: *deviceInUseTimeout,
255264
EnableDataCache: *enableDataCacheFlag,
256265
DataCacheEnabledNodePool: isDataCacheEnabledNodePool,
266+
EnableDiskTopology: *diskTopology,
257267
}
268+
269+
if *diskTopology {
270+
klog.V(2).Infof("Setting up kubeClient")
271+
kubeClient, err := instantiateKubeClient()
272+
if err != nil {
273+
klog.Fatalf("Failed to instantiate Kubernetes client: %v", err)
274+
}
275+
nsArgs.KubeClient = kubeClient
276+
}
277+
258278
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
279+
259280
if *maxConcurrentFormatAndMount > 0 {
260281
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
261282
}
@@ -292,6 +313,18 @@ func handle() {
292313
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager)
293314
}
294315

316+
func instantiateKubeClient() (*kubernetes.Clientset, error) {
317+
cfg, err := rest.InClusterConfig()
318+
if err != nil {
319+
return nil, fmt.Errorf("failed to create REST Config for k8s client: %w", err)
320+
}
321+
kubeClient, err := kubernetes.NewForConfig(cfg)
322+
if err != nil {
323+
return nil, fmt.Errorf("failed to create k8s client: %w", err)
324+
}
325+
return kubeClient, nil
326+
}
327+
295328
func notEmpty(v string) bool {
296329
return v != ""
297330
}

deploy/kubernetes/base/controller/cluster_setup.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ rules:
205205
verbs: ['use']
206206
resourceNames:
207207
- csi-gce-pd-node-psp
208-
- apiGroups: [""]
208+
- apiGroups: [""] # The core API group
209209
resources: ["nodes"]
210210
verbs: ["get", "list"]
211211
---
@@ -220,6 +220,9 @@ rules:
220220
verbs: ['use']
221221
resourceNames:
222222
- csi-gce-pd-node-psp-win
223+
- apiGroups: [""] # The core API group
224+
resources: ["nodes"]
225+
verbs: ["get", "list"]
223226
---
224227

225228
apiVersion: rbac.authorization.k8s.io/v1

pkg/common/constants.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package common
1818

1919
const (
2020
// Keys for Topology. This key will be shared amongst drivers from GCP
21-
TopologyKeyZone = "topology.gke.io/zone"
21+
TopologyKeyPrefix = "topology.gke.io"
22+
TopologyKeyZone = TopologyKeyPrefix + "/zone"
2223

2324
// VolumeAttributes for Partition
2425
VolumeAttributePartition = "partition"

pkg/common/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,3 +764,11 @@ func MapNumber(num int64) int64 {
764764
}
765765
return 0
766766
}
767+
768+
func IsGKETopologyLabel(key string) bool {
769+
return strings.HasPrefix(key, TopologyKeyPrefix)
770+
}
771+
772+
func TopologyLabelKey(diskType string) string {
773+
return fmt.Sprintf("%s/%s", TopologyKeyPrefix, diskType)
774+
}

pkg/gce-cloud-provider/metadata/fake.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ type fakeServiceManager struct{}
2020

2121
var _ MetadataService = &fakeServiceManager{}
2222

23-
const (
24-
FakeZone = "country-region-zone"
25-
FakeProject = "test-project"
23+
var (
24+
FakeMachineType = "n1-standard-1"
25+
FakeZone = "country-region-zone"
26+
FakeProject = "test-project"
27+
FakeName = "test-name"
2628
)
2729

28-
var FakeMachineType = "n1-standard-1"
29-
3030
func NewFakeService() MetadataService {
3131
return &fakeServiceManager{}
3232
}
@@ -40,7 +40,7 @@ func (manager *fakeServiceManager) GetProject() string {
4040
}
4141

4242
func (manager *fakeServiceManager) GetName() string {
43-
return "test-name"
43+
return FakeName
4444
}
4545

4646
func (manager *fakeServiceManager) GetMachineType() string {
@@ -50,3 +50,11 @@ func (manager *fakeServiceManager) GetMachineType() string {
5050
func SetMachineType(s string) {
5151
FakeMachineType = s
5252
}
53+
54+
func SetZone(s string) {
55+
FakeZone = s
56+
}
57+
58+
func SetName(s string) {
59+
FakeName = s
60+
}

pkg/gce-pd-csi-driver/controller.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ type GCEControllerServer struct {
120120
// Embed UnimplementedControllerServer to ensure the driver returns Unimplemented for any
121121
// new RPC methods that might be introduced in future versions of the spec.
122122
csi.UnimplementedControllerServer
123+
124+
EnableDiskTopology bool
125+
}
126+
127+
type GCEControllerServerArgs struct {
128+
EnableDiskTopology bool
123129
}
124130

125131
type MultiZoneVolumeHandleConfig struct {
@@ -320,7 +326,7 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req
320326
if len(req.GetName()) == 0 {
321327
return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
322328
}
323-
if volumeCapabilities == nil || len(volumeCapabilities) == 0 {
329+
if len(volumeCapabilities) == 0 {
324330
return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided")
325331
}
326332

@@ -517,7 +523,7 @@ func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *
517523
// Use the first response as a template
518524
volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName())
519525
klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey)
520-
return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil
526+
return gceCS.generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil
521527
}
522528

523529
func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) {
@@ -623,7 +629,7 @@ func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, re
623629
return nil, common.LoggedError("CreateVolume failed: %v", err)
624630
}
625631

626-
return generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err
632+
return gceCS.generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err
627633
}
628634

629635
func getAccessMode(req *csi.CreateVolumeRequest, params common.DiskParameters) (string, error) {
@@ -2304,9 +2310,11 @@ func getZonesFromTopology(topList []*csi.Topology) ([]string, error) {
23042310
func getZoneFromSegment(seg map[string]string) (string, error) {
23052311
var zone string
23062312
for k, v := range seg {
2307-
switch k {
2308-
case common.TopologyKeyZone:
2313+
switch {
2314+
case k == common.TopologyKeyZone:
23092315
zone = v
2316+
case common.IsGKETopologyLabel(k):
2317+
continue
23102318
default:
23112319
return "", fmt.Errorf("topology segment has unknown key %v", k)
23122320
}
@@ -2396,21 +2404,30 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) {
23962404
case contextForceAttach:
23972405
b, err := common.ConvertStringToBool(val)
23982406
if err != nil {
2399-
return nil, fmt.Errorf("Bad volume context force attach: %v", err)
2407+
return nil, fmt.Errorf("bad volume context force attach: %w", err)
24002408
}
24012409
info.ForceAttach = b
24022410
}
24032411
}
24042412
return info, nil
24052413
}
24062414

2407-
func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
2415+
func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
24082416
tops := []*csi.Topology{}
24092417
for _, zone := range zones {
2410-
tops = append(tops, &csi.Topology{
2411-
Segments: map[string]string{common.TopologyKeyZone: zone},
2412-
})
2418+
top := &csi.Topology{
2419+
Segments: map[string]string{
2420+
common.TopologyKeyZone: zone,
2421+
},
2422+
}
2423+
2424+
if gceCS.EnableDiskTopology {
2425+
top.Segments[common.TopologyLabelKey(params.DiskType)] = "true"
2426+
}
2427+
2428+
tops = append(tops, top)
24132429
}
2430+
24142431
realDiskSizeBytes := common.GbToBytes(disk.GetSizeGb())
24152432
createResp := &csi.CreateVolumeResponse{
24162433
Volume: &csi.Volume{

0 commit comments

Comments
 (0)