Skip to content

Commit 18c35d2

Browse files
committed
Only add disk support Topology if all nodes have a disk support label
1 parent 31e3f89 commit 18c35d2

File tree

430 files changed

+34725
-12
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

430 files changed

+34725
-12
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
driver "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-pd-csi-driver"
3939
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
4040
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
41+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/nodelabels"
4142
)
4243

4344
var (
@@ -96,7 +97,7 @@ var (
9697

9798
extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '<parent_id1>/<tag_key1>/<tag_value1>,...,<parent_idN>/<tag_keyN>/<tag_valueN>'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details")
9899

99-
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a topology.gke.io/[disk-type] topology label to the Topologies returned in CreateVolumeResponse")
100+
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a topology.gke.io/[disk-type] topology label to the Topologies returned in CreateVolumeResponse. Label is only added if all cluster nodes have at least one disk support label")
100101

101102
version string
102103
)
@@ -229,11 +230,27 @@ func handle() {
229230
if err != nil {
230231
klog.Fatalf("Failed to get cloud provider: %v", err.Error())
231232
}
233+
232234
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
233235
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
234236
args := &driver.GCEControllerServerArgs{
235237
EnableDiskTopology: *diskTopology,
236238
}
239+
240+
if *diskTopology {
241+
klog.V(2).Infof("Setting up kubeClient")
242+
kubeClient, err := instantiateKubeClient()
243+
if err != nil {
244+
klog.Fatalf("Failed to instantiate Kubernetes client: %v", err)
245+
}
246+
klog.V(2).Infof("Setting up node lister with informer")
247+
labelVerifier, err := nodelabels.NewVerifier(ctx, kubeClient)
248+
if err != nil {
249+
klog.Fatalf("Failed to create node label verifier: %v", err)
250+
}
251+
args.LabelVerifier = labelVerifier
252+
}
253+
237254
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args)
238255
} else if *cloudConfigFilePath != "" {
239256
klog.Warningf("controller service is disabled but cloud config given - it has no effect")

pkg/gce-pd-csi-driver/controller.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
4141
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
4242
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
43+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/nodelabels"
4344
)
4445

4546
type GCEControllerServer struct {
@@ -122,10 +123,12 @@ type GCEControllerServer struct {
122123
csi.UnimplementedControllerServer
123124

124125
EnableDiskTopology bool
126+
LabelVerifier *nodelabels.Verifier
125127
}
126128

127129
type GCEControllerServerArgs struct {
128130
EnableDiskTopology bool
131+
LabelVerifier *nodelabels.Verifier
129132
}
130133

131134
type MultiZoneVolumeHandleConfig struct {
@@ -471,9 +474,7 @@ func (gceCS *GCEControllerServer) getMultiZoneProvisioningZones(ctx context.Cont
471474
}
472475

473476
func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool) (*csi.CreateVolumeResponse, error) {
474-
// Determine the zones that are needed.
475477
var err error
476-
477478
// For multi-zone, we either select:
478479
// 1) The zones specified in requisite topology requirements
479480
// 2) All zones in the region that are compatible with the selected disk type
@@ -523,7 +524,12 @@ func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *
523524
// Use the first response as a template
524525
volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName())
525526
klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey)
526-
return gceCS.generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil
527+
528+
resp, err := gceCS.generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId)
529+
if err != nil {
530+
return nil, fmt.Errorf("failed to generate createVolumeResponse: %w", err)
531+
}
532+
return resp, nil
527533
}
528534

529535
func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) {
@@ -623,13 +629,22 @@ func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, re
623629
return nil, status.Errorf(codes.Aborted, common.VolumeOperationAlreadyExistsFmt, volumeID)
624630
}
625631
defer gceCS.volumeLocks.Release(volumeID)
632+
<<<<<<< HEAD
626633
disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones, accessMode)
634+
=======
635+
>>>>>>> 474af8f4 (Only add disk support Topology if all nodes have a disk support label)
627636

637+
disk, err := gceCS.createSingleDisk(ctx, req, params, volKey, zones)
628638
if err != nil {
629639
return nil, common.LoggedError("CreateVolume failed: %v", err)
630640
}
631641

632-
return gceCS.generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err
642+
resp, err := gceCS.generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID)
643+
if err != nil {
644+
return nil, fmt.Errorf("failed to generate createVolumeResponse: %w", err)
645+
}
646+
647+
return resp, nil
633648
}
634649

635650
func getAccessMode(req *csi.CreateVolumeRequest, params common.DiskParameters) (string, error) {
@@ -2412,7 +2427,7 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) {
24122427
return info, nil
24132428
}
24142429

2415-
func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
2430+
func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) (*csi.CreateVolumeResponse, error) {
24162431
tops := []*csi.Topology{}
24172432
for _, zone := range zones {
24182433
top := &csi.Topology{
@@ -2422,7 +2437,14 @@ func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk
24222437
}
24232438

24242439
if gceCS.EnableDiskTopology {
2425-
top.Segments[common.TopologyLabelKey(params.DiskType)] = "true"
2440+
klog.V(4).Infof("Verifying disk support labels on cluster nodes before adding disk support topology")
2441+
addDiskSupportTopology, err := gceCS.allNodesHaveDiskSupportLabel()
2442+
if err != nil {
2443+
return nil, fmt.Errorf("failed to check if all nodes have disk support label: %w", err)
2444+
}
2445+
if addDiskSupportTopology {
2446+
top.Segments[common.TopologyLabelKey(params.DiskType)] = "true"
2447+
}
24262448
}
24272449

24282450
tops = append(tops, top)
@@ -2479,7 +2501,15 @@ func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk
24792501
}
24802502
createResp.Volume.ContentSource = contentSource
24812503
}
2482-
return createResp
2504+
return createResp, nil
2505+
}
2506+
2507+
func (gceCS *GCEControllerServer) allNodesHaveDiskSupportLabel() (bool, error) {
2508+
allNodesHaveDiskSupportLabel, err := gceCS.LabelVerifier.AllNodesHaveDiskSupportLabel()
2509+
if err != nil {
2510+
return false, fmt.Errorf("failed to check if all nodes have disk support label: %w", err)
2511+
}
2512+
return allNodesHaveDiskSupportLabel, nil
24832513
}
24842514

24852515
func getResourceId(resourceLink string) (string, error) {

pkg/gce-pd-csi-driver/controller_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"google.golang.org/api/googleapi"
3838
"google.golang.org/grpc/codes"
3939
"google.golang.org/grpc/status"
40+
corev1 "k8s.io/api/core/v1"
41+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4042
"k8s.io/apimachinery/pkg/util/sets"
4143
"k8s.io/client-go/util/flowcontrol"
4244
"k8s.io/klog/v2"
@@ -47,6 +49,7 @@ import (
4749
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
4850
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
4951
gcecloudprovider "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
52+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/nodelabels"
5053
)
5154

5255
const (
@@ -617,7 +620,7 @@ func TestCreateVolumeArguments(t *testing.T) {
617620
enableStoragePools bool
618621
expVol *csi.Volume
619622
expErrCode codes.Code
620-
EnableDiskTopology bool
623+
enableDiskTopology bool
621624
}{
622625
{
623626
name: "success default",
@@ -1371,7 +1374,7 @@ func TestCreateVolumeArguments(t *testing.T) {
13711374
},
13721375
},
13731376
},
1374-
EnableDiskTopology: true,
1377+
enableDiskTopology: true,
13751378
},
13761379
{
13771380
// Desired as the disk type label should match the `type` parameter,
@@ -1414,19 +1417,31 @@ func TestCreateVolumeArguments(t *testing.T) {
14141417
},
14151418
},
14161419
},
1417-
EnableDiskTopology: true,
1420+
enableDiskTopology: true,
14181421
},
14191422
}
14201423

14211424
// Run test cases
14221425
for _, tc := range testCases {
14231426
t.Logf("test case: %s", tc.name)
1427+
14241428
// Setup new driver each time so no interference
14251429
args := &GCEControllerServerArgs{
1426-
EnableDiskTopology: tc.EnableDiskTopology,
1430+
EnableDiskTopology: tc.enableDiskTopology,
1431+
LabelVerifier: nodelabels.NewFakeVerifier(t, []*corev1.Node{
1432+
{
1433+
ObjectMeta: metav1.ObjectMeta{
1434+
Name: "node-with-disk-support-label",
1435+
Labels: map[string]string{
1436+
common.TopologyLabelKey(stdDiskType): "true",
1437+
},
1438+
},
1439+
},
1440+
}),
14271441
}
14281442
gceDriver := initGCEDriver(t, nil, args)
14291443
gceDriver.cs.enableStoragePools = tc.enableStoragePools
1444+
14301445
// Start Test
14311446
resp, err := gceDriver.cs.CreateVolume(context.Background(), tc.req)
14321447
if err != nil {

pkg/gce-pd-csi-driver/gce-pd-driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err
177177
provisionableDisksConfig: provisionableDisksConfig,
178178
enableHdHA: enableHdHA,
179179
EnableDiskTopology: args.EnableDiskTopology,
180+
LabelVerifier: args.LabelVerifier,
180181
}
181182
}
182183

pkg/nodelabels/fake.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package nodelabels
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
corev1 "k8s.io/api/core/v1"
8+
"k8s.io/apimachinery/pkg/runtime"
9+
"k8s.io/client-go/kubernetes"
10+
"k8s.io/client-go/kubernetes/fake"
11+
)
12+
13+
func NewFakeVerifier(t *testing.T, nodes []*corev1.Node) *Verifier {
14+
ctx := context.Background()
15+
clientset := fakeKubeClient(nodes)
16+
17+
verifier, err := NewVerifier(ctx, clientset)
18+
if err != nil {
19+
t.Fatalf("failed to create verifier: %v", err)
20+
}
21+
22+
return verifier
23+
}
24+
25+
func fakeKubeClient(nodes []*corev1.Node) kubernetes.Interface {
26+
// Convert the list of nodes to a slice of runtime.Object
27+
var objects []runtime.Object
28+
for _, node := range nodes {
29+
objects = append(objects, node)
30+
}
31+
32+
// Create a fake clientset with the predefined objects
33+
clientset := fake.NewSimpleClientset(objects...)
34+
35+
return clientset
36+
}

pkg/nodelabels/verifier.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package nodelabels
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
corev1 "k8s.io/api/core/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
"k8s.io/client-go/informers"
11+
"k8s.io/client-go/kubernetes"
12+
listers "k8s.io/client-go/listers/core/v1"
13+
"k8s.io/client-go/tools/cache"
14+
"k8s.io/klog/v2"
15+
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
16+
)
17+
18+
// resyncInterval is the interval at which the informer resyncs. Should the
19+
// informer cache miss an event, the resync will correct the cache.
20+
const resyncInterval = 10 * time.Minute
21+
22+
// Verifier checks node labels for GKE topology labels.
23+
type Verifier struct {
24+
nodeLister listers.NodeLister
25+
}
26+
27+
// NewVerifier creates an informer for listing nodes and returns a
28+
// lister. It waits for the informer to sync and connects the factory's stop
29+
// channel to the context's done channel to prevent goroutine leaks when the
30+
// binary is stopped.
31+
func NewVerifier(ctx context.Context, clientset kubernetes.Interface) (*Verifier, error) {
32+
stopCh := make(chan struct{})
33+
// Should the context be cancelled upstream, we want to stop the informer.
34+
// Documentation suggests that this prevents goroutine leaks.
35+
go func() {
36+
<-ctx.Done()
37+
close(stopCh)
38+
}()
39+
40+
factory := informers.NewSharedInformerFactory(clientset, resyncInterval)
41+
nodeInformer := factory.Core().V1().Nodes().Informer()
42+
nodeLister := factory.Core().V1().Nodes().Lister()
43+
44+
factory.Start(stopCh)
45+
synced := cache.WaitForCacheSync(stopCh, nodeInformer.HasSynced)
46+
if !synced {
47+
return nil, fmt.Errorf("failed to sync node informer")
48+
}
49+
50+
return &Verifier{
51+
nodeLister: nodeLister,
52+
}, nil
53+
}
54+
55+
func (v *Verifier) AllNodesHaveDiskSupportLabel() (bool, error) {
56+
nodes, err := v.nodeLister.List(labels.Everything())
57+
if err != nil {
58+
return false, fmt.Errorf("failed to list nodes: %v", err)
59+
}
60+
return allNodesHaveDiskSupportLabel(nodes), nil
61+
}
62+
63+
func allNodesHaveDiskSupportLabel(nodes []*corev1.Node) bool {
64+
if len(nodes) == 0 {
65+
return false
66+
}
67+
68+
for _, node := range nodes {
69+
if !nodeHasDiskSupportLabel(node) {
70+
klog.V(4).Infof("Node %s does not have disk support label", node.GetName())
71+
return false
72+
}
73+
}
74+
return true
75+
}
76+
77+
func nodeHasDiskSupportLabel(node *corev1.Node) bool {
78+
labels := node.GetLabels()
79+
for key, _ := range labels {
80+
if isNonZoneGKETopologyLabel(key) {
81+
return true
82+
}
83+
}
84+
return false
85+
}
86+
87+
func isNonZoneGKETopologyLabel(key string) bool {
88+
return common.IsGKETopologyLabel(key) && key != common.TopologyKeyZone
89+
}

0 commit comments

Comments
 (0)