@@ -21,6 +21,7 @@ import (
21
21
"math/rand"
22
22
"regexp"
23
23
"sort"
24
+ "strings"
24
25
"time"
25
26
26
27
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
@@ -33,6 +34,7 @@ import (
33
34
"k8s.io/apimachinery/pkg/util/uuid"
34
35
"k8s.io/client-go/util/flowcontrol"
35
36
"k8s.io/klog/v2"
37
+ "k8s.io/utils/strings/slices"
36
38
37
39
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
38
40
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
@@ -101,6 +103,14 @@ type workItem struct {
101
103
unpublishReq * csi.ControllerUnpublishVolumeRequest
102
104
}
103
105
106
+ // locationRequirements are additional location topology requirements that must be respected when creating a volume.
107
+ type locationRequirements struct {
108
+ srcVolRegion string
109
+ srcVolZone string
110
+ srcReplicationType string
111
+ cloneReplicationType string
112
+ }
113
+
104
114
var _ csi.ControllerServer = & GCEControllerServer {}
105
115
106
116
const (
@@ -145,6 +155,44 @@ func isDiskReady(disk *gce.CloudDisk) (bool, error) {
145
155
return false , nil
146
156
}
147
157
158
+ // cloningLocationRequirements returns additional location requirements to be applied to the given create volume requests topology.
159
+ // If the CreateVolumeRequest will use volume cloning, location requirements in compliance with the volume cloning limitations
160
+ // will be returned: https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/volume-cloning#limitations.
161
+ func cloningLocationRequirements (req * csi.CreateVolumeRequest , cloneReplicationType string ) (* locationRequirements , error ) {
162
+ if ! useVolumeCloning (req ) {
163
+ return nil , nil
164
+ }
165
+ // If we are using volume cloning, this will be set.
166
+ volSrc := req .VolumeContentSource .GetVolume ()
167
+ volSrcVolID := volSrc .GetVolumeId ()
168
+
169
+ _ , sourceVolKey , err := common .VolumeIDToKey (volSrcVolID )
170
+ if err != nil {
171
+ return nil , fmt .Errorf ("volume ID is invalid: %w" , err )
172
+ }
173
+
174
+ isZonalSrcVol := sourceVolKey .Type () == meta .Zonal
175
+ if isZonalSrcVol {
176
+ region , err := common .GetRegionFromZones ([]string {sourceVolKey .Zone })
177
+ if err != nil {
178
+ return nil , fmt .Errorf ("failed to get region from zones: %w" , err )
179
+ }
180
+ sourceVolKey .Region = region
181
+ }
182
+
183
+ srcReplicationType := replicationTypeNone
184
+ if ! isZonalSrcVol {
185
+ srcReplicationType = replicationTypeRegionalPD
186
+ }
187
+
188
+ return & locationRequirements {srcVolZone : sourceVolKey .Zone , srcVolRegion : sourceVolKey .Region , srcReplicationType : srcReplicationType , cloneReplicationType : cloneReplicationType }, nil
189
+ }
190
+
191
+ // useVolumeCloning returns true if the create volume request should be created with volume cloning.
192
+ func useVolumeCloning (req * csi.CreateVolumeRequest ) bool {
193
+ return req .VolumeContentSource != nil && req .VolumeContentSource .GetVolume () != nil
194
+ }
195
+
148
196
func (gceCS * GCEControllerServer ) CreateVolume (ctx context.Context , req * csi.CreateVolumeRequest ) (* csi.CreateVolumeResponse , error ) {
149
197
var err error
150
198
diskTypeForMetric := ""
@@ -187,12 +235,21 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
187
235
if multiWriter {
188
236
gceAPIVersion = gce .GCEAPIVersionBeta
189
237
}
238
+
239
+ var locationTopReq * locationRequirements
240
+ if useVolumeCloning (req ) {
241
+ locationTopReq , err = cloningLocationRequirements (req , params .ReplicationType )
242
+ if err != nil {
243
+ return nil , status .Errorf (codes .InvalidArgument , "failed to get location requirements: %v" , err .Error ())
244
+ }
245
+ }
246
+
190
247
// Determine the zone or zones+region of the disk
191
248
var zones []string
192
249
var volKey * meta.Key
193
250
switch params .ReplicationType {
194
251
case replicationTypeNone :
195
- zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 1 )
252
+ zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 1 , locationTopReq )
196
253
if err != nil {
197
254
return nil , status .Errorf (codes .InvalidArgument , "CreateVolume failed to pick zones for disk: %v" , err .Error ())
198
255
}
@@ -202,7 +259,7 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre
202
259
volKey = meta .ZonalKey (name , zones [0 ])
203
260
204
261
case replicationTypeRegionalPD :
205
- zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 2 )
262
+ zones , err = pickZones (ctx , gceCS , req .GetAccessibilityRequirements (), 2 , locationTopReq )
206
263
if err != nil {
207
264
return nil , status .Errorf (codes .InvalidArgument , "CreateVolume failed to pick zones for disk: %v" , err .Error ())
208
265
}
@@ -1436,7 +1493,29 @@ func diskIsAttachedAndCompatible(deviceName string, instance *compute.Instance,
1436
1493
return false , nil
1437
1494
}
1438
1495
1439
- func pickZonesFromTopology (top * csi.TopologyRequirement , numZones int ) ([]string , error ) {
1496
+ // pickZonesInRegion will remove any zones that are not in the given region.
1497
+ func pickZonesInRegion (region string , zones []string ) []string {
1498
+ refinedZones := []string {}
1499
+ for _ , zone := range zones {
1500
+ if strings .Contains (zone , region ) {
1501
+ refinedZones = append (refinedZones , zone )
1502
+ }
1503
+ }
1504
+ return refinedZones
1505
+ }
1506
+
1507
+ func prependZone (zone string , zones []string ) []string {
1508
+ newZones := []string {zone }
1509
+ for i := 0 ; i < len (zones ); i ++ {
1510
+ // Do not add a zone if it is equal to the zone that is already prepended to newZones.
1511
+ if zones [i ] != zone {
1512
+ newZones = append (newZones , zones [i ])
1513
+ }
1514
+ }
1515
+ return newZones
1516
+ }
1517
+
1518
+ func pickZonesFromTopology (top * csi.TopologyRequirement , numZones int , locationTopReq * locationRequirements ) ([]string , error ) {
1440
1519
reqZones , err := getZonesFromTopology (top .GetRequisite ())
1441
1520
if err != nil {
1442
1521
return nil , fmt .Errorf ("could not get zones from requisite topology: %w" , err )
@@ -1446,6 +1525,39 @@ func pickZonesFromTopology(top *csi.TopologyRequirement, numZones int) ([]string
1446
1525
return nil , fmt .Errorf ("could not get zones from preferred topology: %w" , err )
1447
1526
}
1448
1527
1528
+ if locationTopReq != nil {
1529
+ srcVolZone := locationTopReq .srcVolZone
1530
+ switch locationTopReq .cloneReplicationType {
1531
+ // For zonal -> zonal cloning, the source disk zone must match the destination disk zone.
1532
+ case replicationTypeNone :
1533
+ // If the source volume zone is not in the topology requirement, we return an error.
1534
+ if ! slices .Contains (prefZones , srcVolZone ) && ! slices .Contains (reqZones , srcVolZone ) {
1535
+ volumeCloningReq := fmt .Sprintf ("clone zone must match source disk zone: %s" , srcVolZone )
1536
+ return nil , fmt .Errorf ("failed to find zone from topology %v: %s" , top , volumeCloningReq )
1537
+ }
1538
+ return []string {srcVolZone }, nil
1539
+ // For zonal or regional -> regional disk cloning, the source disk region must match the destination disk region.
1540
+ case replicationTypeRegionalPD :
1541
+ srcVolRegion := locationTopReq .srcVolRegion
1542
+ prefZones = pickZonesInRegion (srcVolRegion , prefZones )
1543
+ reqZones = pickZonesInRegion (srcVolRegion , reqZones )
1544
+
1545
+ if len (prefZones ) == 0 && len (reqZones ) == 0 {
1546
+ volumeCloningReq := fmt .Sprintf ("clone zone must reside in source disk region %s" , srcVolRegion )
1547
+ return nil , fmt .Errorf ("failed to find zone from topology %v: %s" , top , volumeCloningReq )
1548
+ }
1549
+
1550
+ // For zonal -> regional disk cloning, one of the replicated zones must match the source zone.
1551
+ if locationTopReq .srcReplicationType == replicationTypeNone {
1552
+ if ! slices .Contains (prefZones , srcVolZone ) && ! slices .Contains (reqZones , srcVolZone ) {
1553
+ volumeCloningReq := fmt .Sprintf ("one of the replica zones of the clone must match the source disk zone: %s" , srcVolZone )
1554
+ return nil , fmt .Errorf ("failed to find zone from topology %v: %s" , top , volumeCloningReq )
1555
+ }
1556
+ prefZones = prependZone (srcVolZone , prefZones )
1557
+ }
1558
+ }
1559
+ }
1560
+
1449
1561
if numZones <= len (prefZones ) {
1450
1562
return prefZones [0 :numZones ], nil
1451
1563
} else {
@@ -1504,16 +1616,25 @@ func getZoneFromSegment(seg map[string]string) (string, error) {
1504
1616
return zone , nil
1505
1617
}
1506
1618
1507
- func pickZones (ctx context.Context , gceCS * GCEControllerServer , top * csi.TopologyRequirement , numZones int ) ([]string , error ) {
1619
+ func pickZones (ctx context.Context , gceCS * GCEControllerServer , top * csi.TopologyRequirement , numZones int , locationTopReq * locationRequirements ) ([]string , error ) {
1508
1620
var zones []string
1509
1621
var err error
1510
1622
if top != nil {
1511
- zones , err = pickZonesFromTopology (top , numZones )
1623
+ zones , err = pickZonesFromTopology (top , numZones , locationTopReq )
1512
1624
if err != nil {
1513
1625
return nil , fmt .Errorf ("failed to pick zones from topology: %w" , err )
1514
1626
}
1515
1627
} else {
1516
- zones , err = getDefaultZonesInRegion (ctx , gceCS , []string {gceCS .CloudProvider .GetDefaultZone ()}, numZones )
1628
+ existingZones := []string {gceCS .CloudProvider .GetDefaultZone ()}
1629
+ // We set existingZones to the source volume zone so that for zonal -> zonal cloning, the clone is provisioned
1630
+ // in the same zone as the source volume, and for zonal -> regional, one of the replicated zones will always
1631
+ // be the zone of the source volume. For regional -> regional cloning, the srcVolZone will not be set, so we
1632
+ // just use the default zone.
1633
+ if locationTopReq != nil && locationTopReq .srcReplicationType == replicationTypeNone {
1634
+ existingZones = []string {locationTopReq .srcVolZone }
1635
+ }
1636
+ // If topology is nil, then the Immediate binding mode was used without setting allowedTopologies in the storageclass.
1637
+ zones , err = getDefaultZonesInRegion (ctx , gceCS , existingZones , numZones )
1517
1638
if err != nil {
1518
1639
return nil , fmt .Errorf ("failed to get default %v zones in region: %w" , numZones , err )
1519
1640
}
0 commit comments