Skip to content

Commit 4cd6fa5

Browse files
authored
enhance clean up of CNSVolumeOperations instances to delete CnsVolumeOperationRequest instances (#3375)
with latest TaskInvocationTimestamp older than 15 minutes
1 parent 02d2f5a commit 4cd6fa5

File tree

2 files changed

+38
-90
lines changed

2 files changed

+38
-90
lines changed

pkg/common/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ const (
8080
DefaultCSIFetchPreferredDatastoresIntervalInMin = 5
8181
// DefaultCnsVolumeOperationRequestCleanupIntervalInMin is the default time
8282
// interval after which stale CnsVSphereVolumeMigration CRs will be cleaned up.
83-
// Current default value is set to 24 hours.
84-
DefaultCnsVolumeOperationRequestCleanupIntervalInMin = 1440
83+
// Current default value is set to 15 minutes.
84+
DefaultCnsVolumeOperationRequestCleanupIntervalInMin = 15
8585
// DefaultGlobalMaxSnapshotsPerBlockVolume is the default maximum number of block volume snapshots per volume.
8686
DefaultGlobalMaxSnapshotsPerBlockVolume = 3
8787
// MaxNumberOfTopologyCategories is the max number of topology domains/categories allowed.

pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest.go

Lines changed: 36 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131

3232
csiconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
3333
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
34-
csitypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/types"
3534
cnsvolumeoperationrequestconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeoperationrequest/config"
3635
cnsvolumeoprequestv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeoperationrequest/v1alpha1"
3736
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
@@ -127,7 +126,7 @@ func InitVolumeOperationRequestInterface(ctx context.Context, cleanupInterval in
127126
operationRequestStoreInstance = &operationRequestStore{
128127
k8sclient: k8sclient,
129128
}
130-
go operationRequestStoreInstance.cleanupStaleInstances(cleanupInterval, isBlockVolumeSnapshotEnabled)
129+
go operationRequestStoreInstance.cleanupStaleInstances(cleanupInterval)
131130
}
132131
// Store PodVMOnStretchedSupervisor FSS value for later use.
133132
isPodVMOnStretchSupervisorFSSEnabled = isPodVMOnStretchSupervisorEnabled
@@ -352,109 +351,58 @@ func (or *operationRequestStore) DeleteRequestDetails(ctx context.Context, name
352351
return nil
353352
}
354353

355-
// cleanupStaleInstances cleans up CnsVolumeOperationRequest instances for
356-
// volumes that are no longer present in the kubernetes cluster.
357-
func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int, isBlockVolumeSnapshotEnabled func() bool) {
354+
// cleanupStaleInstances cleans up CnsVolumeOperationRequest instances
355+
// with latest TaskInvocationTimestamp older than 15 minutes
356+
func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int) {
358357
ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Minute)
359358
ctx, log := logger.GetNewContextWithLogger()
360359
log.Infof("CnsVolumeOperationRequest clean up interval is set to %d minutes", cleanupInterval)
361360
for ; true; <-ticker.C {
361+
cutoffTime := time.Now().Add(-15 * time.Minute)
362+
continueToken := ""
362363
log.Infof("Cleaning up stale CnsVolumeOperationRequest instances.")
363-
364-
instanceMap := make(map[string]bool)
365-
366-
cnsVolumeOperationRequestList := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestList{}
367-
err := or.k8sclient.List(ctx, cnsVolumeOperationRequestList)
368-
if err != nil {
369-
log.Errorf("failed to list CnsVolumeOperationRequests with error %v. Abandoning "+
370-
"CnsVolumeOperationRequests clean up ...", err)
371-
continue
372-
}
373-
374-
k8sclient, err := k8s.NewClient(ctx)
375-
if err != nil {
376-
log.Errorf("failed to get k8sclient with error: %v. Abandoning CnsVolumeOperationRequests "+
377-
"clean up ...", err)
378-
continue
379-
}
380-
pvList, err := k8sclient.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
381-
if err != nil {
382-
log.Errorf("failed to list PersistentVolumes with error %v. Abandoning "+
383-
"CnsVolumeOperationRequests clean up ...", err)
384-
continue
385-
}
386-
387-
for _, pv := range pvList.Items {
388-
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == csitypes.Name {
389-
instanceMap[pv.Name] = true
390-
volumeHandle := pv.Spec.CSI.VolumeHandle
391-
if strings.Contains(volumeHandle, "file") {
392-
volumeHandle = strings.ReplaceAll(volumeHandle, ":", "-")
393-
}
394-
instanceMap[volumeHandle] = true
364+
for {
365+
listOptions := &client.ListOptions{
366+
Limit: 5000,
367+
Continue: continueToken,
395368
}
396-
}
397-
blockVolumeSnapshotEnabled := isBlockVolumeSnapshotEnabled()
398-
// skip cleaning up of snapshot related CnsVolumeOperationRequests if FSS is not enabled.
399-
if blockVolumeSnapshotEnabled {
400-
snapshotterClient, err := k8s.NewSnapshotterClient(ctx)
369+
cnsVolumeOperationRequestList := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestList{}
370+
err := or.k8sclient.List(ctx, cnsVolumeOperationRequestList, listOptions)
401371
if err != nil {
402-
log.Errorf("failed to get snapshotterClient with error: %v. Abandoning "+
372+
log.Errorf("failed to list CnsVolumeOperationRequests with error %v. Abandoning "+
403373
"CnsVolumeOperationRequests clean up ...", err)
404-
return
374+
break
405375
}
406-
407-
// the List API below ensures VolumeSnapshotContent CRD is installed and lists the existing
408-
// VolumeSnapshotContent CRs in cluster.
409-
vscList, err := snapshotterClient.SnapshotV1().VolumeSnapshotContents().List(ctx, metav1.ListOptions{})
410-
if err != nil {
411-
log.Errorf("failed to list VolumeSnapshotContents with error %v. Abandoning "+
412-
"CnsVolumeOperationRequests clean up ...", err)
413-
return
414-
}
415-
416-
for _, vsc := range vscList.Items {
417-
if vsc.Spec.Driver != csitypes.Name {
376+
for _, instance := range cnsVolumeOperationRequestList.Items {
377+
latestOperationDetailsLength := len(instance.Status.LatestOperationDetails)
378+
// Skip if task is still in progress
379+
if latestOperationDetailsLength != 0 &&
380+
instance.Status.LatestOperationDetails[latestOperationDetailsLength-1].TaskStatus ==
381+
TaskInvocationStatusInProgress {
418382
continue
419383
}
420-
volumeHandle := vsc.Spec.Source.VolumeHandle
421-
if volumeHandle != nil {
422-
// CnsVolumeOperation instance for CreateSnapshot
423-
instanceMap[strings.TrimPrefix(vsc.Name, "snapcontent-")+"-"+*volumeHandle] = true
424-
}
425-
if vsc.Status != nil && vsc.Status.SnapshotHandle != nil {
426-
// CnsVolumeOperation instance for DeleteSnapshot
427-
instanceMap[strings.Replace(*vsc.Status.SnapshotHandle, "+", "-", 1)] = true
384+
// Delete instance if TaskInvocationTimestamp is older than 15 minutes
385+
if latestOperationDetailsLength != 0 &&
386+
instance.Status.LatestOperationDetails[latestOperationDetailsLength-1].
387+
TaskInvocationTimestamp.Time.After(cutoffTime) {
388+
log.Debugf("CnsVolumeOperationRequest instance %q is skipped for deletion", instance.Name)
389+
continue
428390
}
429-
}
430-
}
431-
432-
for _, instance := range cnsVolumeOperationRequestList.Items {
433-
latestOperationDetailsLength := len(instance.Status.LatestOperationDetails)
434-
if latestOperationDetailsLength != 0 &&
435-
instance.Status.LatestOperationDetails[latestOperationDetailsLength-1].TaskStatus ==
436-
TaskInvocationStatusInProgress {
437-
continue
438-
}
439-
var trimmedName string
440-
switch {
441-
case strings.HasPrefix(instance.Name, "pvc"):
442-
trimmedName = instance.Name
443-
case strings.HasPrefix(instance.Name, "delete"):
444-
trimmedName = strings.TrimPrefix(instance.Name, "delete-")
445-
case strings.HasPrefix(instance.Name, "expand"):
446-
trimmedName = strings.TrimPrefix(instance.Name, "expand-")
447-
case blockVolumeSnapshotEnabled && strings.HasPrefix(instance.Name, "snapshot"):
448-
trimmedName = strings.TrimPrefix(instance.Name, "snapshot-")
449-
case blockVolumeSnapshotEnabled && strings.HasPrefix(instance.Name, "deletesnapshot"):
450-
trimmedName = strings.TrimPrefix(instance.Name, "deletesnapshot-")
451-
}
452-
if _, ok := instanceMap[trimmedName]; !ok {
391+
log.Debugf("Calling DeleteRequestDetails for %q", instance.Name)
453392
err = or.DeleteRequestDetails(ctx, instance.Name)
454393
if err != nil {
455394
log.Errorf("failed to delete CnsVolumeOperationRequest instance %s with error %v",
456395
instance.Name, err)
457396
}
397+
log.Debugf("CnsVolumeOperationRequest instance %q is deleted", instance.Name)
398+
}
399+
// Exit if there are no more pages
400+
continueToken = cnsVolumeOperationRequestList.GetContinue()
401+
log.Debugf("continueToken to process remaining CnsVolumeOperationRequest "+
402+
"insances is %q", continueToken)
403+
if continueToken == "" {
404+
log.Infof("all CnsVolumeOperationRequest instances are processed")
405+
break
458406
}
459407
}
460408
log.Infof("Clean up of stale CnsVolumeOperationRequest complete.")

0 commit comments

Comments
 (0)