@@ -37,6 +37,7 @@ import (
37
37
apierrors "k8s.io/apimachinery/pkg/api/errors"
38
38
"k8s.io/apimachinery/pkg/runtime"
39
39
"k8s.io/apimachinery/pkg/types"
40
+ "k8s.io/client-go/kubernetes"
40
41
"k8s.io/client-go/kubernetes/scheme"
41
42
"k8s.io/client-go/tools/record"
42
43
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -65,13 +66,31 @@ import (
65
66
var (
66
67
backOffDuration map [types.NamespacedName ]time.Duration
67
68
backOffDurationMapMutex = sync.Mutex {}
69
+ // Per volume lock for concurrent access to PVCs.
70
+ // Keys are strings representing namespace + PVC name.
71
+ // Values are individual sync.Mutex locks that need to be held
72
+ // to make updates to the PVC on the API server.
73
+ VolumeLock * sync.Map
68
74
)
69
75
70
76
const (
71
77
workerThreadsEnvVar = "WORKER_THREADS_NODEVM_BATCH_ATTACH"
72
78
defaultMaxWorkerThreads = 10
73
79
)
74
80
81
+ var newClientFunc = func (ctx context.Context ) (kubernetes.Interface , error ) {
82
+ log := logger .GetLogger (ctx )
83
+
84
+ // Initializes kubernetes client.
85
+ k8sclient , err := k8s .NewClient (ctx )
86
+ if err != nil {
87
+ log .Errorf ("Creating Kubernetes client failed. Err: %v" , err )
88
+ return nil , err
89
+ }
90
+
91
+ return k8sclient , nil
92
+ }
93
+
75
94
func Add (mgr manager.Manager , clusterFlavor cnstypes.CnsClusterFlavor ,
76
95
configInfo * config.ConfigurationInfo , volumeManager volumes.Manager ) error {
77
96
ctx , log := logger .GetNewContextWithLogger ()
@@ -149,6 +168,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
149
168
return err
150
169
}
151
170
171
+ VolumeLock = & sync.Map {}
152
172
backOffDuration = make (map [types.NamespacedName ]time.Duration )
153
173
return nil
154
174
}
@@ -236,6 +256,13 @@ func (r *Reconciler) Reconcile(ctx context.Context,
236
256
return r .completeReconciliationWithError (batchAttachCtx , instance , request .NamespacedName , timeout , err )
237
257
}
238
258
259
+ // Initializes kubernetes client.
260
+ k8sClient , err := newClientFunc (ctx )
261
+ if err != nil {
262
+ log .Errorf ("Creating Kubernetes client failed. Err: %v" , err )
263
+ return r .completeReconciliationWithError (batchAttachCtx , instance , request .NamespacedName , timeout , err )
264
+ }
265
+
239
266
volumesToDetach := make (map [string ]string )
240
267
if vm == nil {
241
268
// If VM is nil, it means it is deleted from the vCenter.
@@ -251,7 +278,7 @@ func (r *Reconciler) Reconcile(ctx context.Context,
251
278
request .NamespacedName )
252
279
} else {
253
280
// If VM was found on vCenter, find the volumes to be detached from it.
254
- volumesToDetach , err = getVolumesToDetach (batchAttachCtx , instance , vm , r .client )
281
+ volumesToDetach , err = getVolumesToDetach (batchAttachCtx , instance , vm , r .client , k8sClient )
255
282
if err != nil {
256
283
log .Errorf ("failed to find volumes to detach for instance %s. Err: %s" ,
257
284
request .NamespacedName .String (), err )
@@ -264,7 +291,17 @@ func (r *Reconciler) Reconcile(ctx context.Context,
264
291
// This means all volumes can be considered detached. So remove finalizer from CR instance.
265
292
if instance .DeletionTimestamp != nil && vm == nil {
266
293
log .Infof ("Instance %s is being deleted and VM object is also deleted from VC" , request .NamespacedName .String ())
267
- // TODO: remove PVC finalizer
294
+
295
+ // For every PVC mentioned in instance.Spec, remove finalizer from its PVC.
296
+ for _ , volume := range instance .Spec .Volumes {
297
+ err := removePvcFinalizer (ctx , r .client , k8sClient , volume .PersistentVolumeClaim .ClaimName , instance .Namespace ,
298
+ instance .Spec .NodeUUID )
299
+ if err != nil {
300
+ log .Errorf ("failed to remove finalizer from PVC %s. Err: %s" , volume .PersistentVolumeClaim .ClaimName ,
301
+ err )
302
+ return r .completeReconciliationWithError (batchAttachCtx , instance , request .NamespacedName , timeout , err )
303
+ }
304
+ }
268
305
269
306
patchErr := removeFinalizerFromCRDInstance (batchAttachCtx , instance , r .client )
270
307
if patchErr != nil {
@@ -304,7 +341,7 @@ func (r *Reconciler) Reconcile(ctx context.Context,
304
341
}
305
342
306
343
// Call reconcile when deletion timestamp is not set on the instance.
307
- err := r .reconcileInstanceWithoutDeletionTimestamp (batchAttachCtx , instance , volumesToDetach , vm )
344
+ err := r .reconcileInstanceWithoutDeletionTimestamp (batchAttachCtx , k8sClient , instance , volumesToDetach , vm )
308
345
if err != nil {
309
346
log .Errorf ("failed to reconile instance %s. Err: %s" , request .NamespacedName .String (), err )
310
347
return r .completeReconciliationWithError (batchAttachCtx , instance , request .NamespacedName , timeout , err )
@@ -317,7 +354,7 @@ func (r *Reconciler) Reconcile(ctx context.Context,
317
354
log .Infof ("Deletion timestamp observed on instance %s. Detaching all volumes." , request .NamespacedName .String ())
318
355
319
356
// Call reconcile when deletion timestamp is set on the instance.
320
- err := r .reconcileInstanceWithDeletionTimestamp (batchAttachCtx , instance , volumesToDetach , vm )
357
+ err := r .reconcileInstanceWithDeletionTimestamp (batchAttachCtx , k8sClient , instance , volumesToDetach , vm )
321
358
if err != nil {
322
359
log .Errorf ("failed to reconcile instance %s. Err: %s" , request .NamespacedName .String (), err )
323
360
return r .completeReconciliationWithError (batchAttachCtx , instance , request .NamespacedName , timeout , err )
@@ -332,12 +369,13 @@ func (r *Reconciler) Reconcile(ctx context.Context,
332
369
// reconcileInstanceWithDeletionTimestamp calls detach volume for all volumes present in volumesToDetach.
333
370
// As the instance is being deleted, we do not need to attach anything.
334
371
func (r * Reconciler ) reconcileInstanceWithDeletionTimestamp (ctx context.Context ,
372
+ k8sClient kubernetes.Interface ,
335
373
instance * v1alpha1.CnsNodeVmBatchAttachment ,
336
374
volumesToDetach map [string ]string ,
337
375
vm * cnsvsphere.VirtualMachine ) error {
338
376
log := logger .GetLogger (ctx )
339
377
340
- err := r .processDetach (ctx , vm , instance , volumesToDetach )
378
+ err := r .processDetach (ctx , k8sClient , vm , instance , volumesToDetach )
341
379
if err != nil {
342
380
log .Errorf ("failed to detach all volumes. Err: %s" , err )
343
381
return err
@@ -350,6 +388,7 @@ func (r *Reconciler) reconcileInstanceWithDeletionTimestamp(ctx context.Context,
350
388
// reconcileInstanceWithoutDeletionTimestamp calls CNS batch attach for all volumes in instance spec
351
389
// and CNS detach for the volumes volumesToDetach.
352
390
func (r * Reconciler ) reconcileInstanceWithoutDeletionTimestamp (ctx context.Context ,
391
+ k8sClient kubernetes.Interface ,
353
392
instance * v1alpha1.CnsNodeVmBatchAttachment ,
354
393
volumesToDetach map [string ]string ,
355
394
vm * cnsvsphere.VirtualMachine ) error {
@@ -358,7 +397,7 @@ func (r *Reconciler) reconcileInstanceWithoutDeletionTimestamp(ctx context.Conte
358
397
var detachErr error
359
398
// Call detach if there are some volumes which need to be detached.
360
399
if len (volumesToDetach ) != 0 {
361
- detachErr = r .processDetach (ctx , vm , instance , volumesToDetach )
400
+ detachErr = r .processDetach (ctx , k8sClient , vm , instance , volumesToDetach )
362
401
if detachErr != nil {
363
402
log .Errorf ("failed to detach all volumes. Err: %s" , detachErr )
364
403
} else {
@@ -367,7 +406,7 @@ func (r *Reconciler) reconcileInstanceWithoutDeletionTimestamp(ctx context.Conte
367
406
}
368
407
369
408
// Call batch attach for volumes.
370
- attachErr := r .processBatchAttach (ctx , vm , instance )
409
+ attachErr := r .processBatchAttach (ctx , k8sClient , vm , instance )
371
410
if attachErr != nil {
372
411
log .Errorf ("failed to attach all volumes. Err: %+v" , attachErr )
373
412
}
@@ -377,12 +416,13 @@ func (r *Reconciler) reconcileInstanceWithoutDeletionTimestamp(ctx context.Conte
377
416
378
417
// processDetach detaches each of the volumes in volumesToDetach by calling CNS DetachVolume API.
379
418
func (r * Reconciler ) processDetach (ctx context.Context ,
419
+ k8sClient kubernetes.Interface ,
380
420
vm * cnsvsphere.VirtualMachine ,
381
421
instance * v1alpha1.CnsNodeVmBatchAttachment , volumesToDetach map [string ]string ) error {
382
422
log := logger .GetLogger (ctx )
383
423
log .Debugf ("Calling detach volume for PVC %+v" , volumesToDetach )
384
424
385
- volumesThatFailedToDetach := r .detachVolumes (ctx , vm , volumesToDetach , instance )
425
+ volumesThatFailedToDetach := r .detachVolumes (ctx , k8sClient , vm , volumesToDetach , instance )
386
426
387
427
var overallErr error
388
428
if len (volumesThatFailedToDetach ) != 0 {
@@ -398,6 +438,7 @@ func (r *Reconciler) processDetach(ctx context.Context,
398
438
399
439
// detachVolumes calls Cns DetachVolume for every PVC in volumesToDetach.
400
440
func (r * Reconciler ) detachVolumes (ctx context.Context ,
441
+ k8sClient kubernetes.Interface ,
401
442
vm * cnsvsphere.VirtualMachine , volumesToDetach map [string ]string ,
402
443
instance * v1alpha1.CnsNodeVmBatchAttachment ) []string {
403
444
log := logger .GetLogger (ctx )
@@ -414,11 +455,9 @@ func (r *Reconciler) detachVolumes(ctx context.Context,
414
455
// If VM was not found, can assume that the detach is successful.
415
456
if cnsvsphere .IsManagedObjectNotFound (detachErr , vm .VirtualMachine .Reference ()) {
416
457
log .Infof ("Found a managed object not found fault for vm: %+v" , vm )
417
- // TODO: remove PVC finalizer
418
-
419
- // Remove entry of this volume from the instance's status.
420
- deleteVolumeFromStatus (pvc , instance )
421
- log .Infof ("Successfully detached volume %s from VM %s" , pvc , instance .Spec .NodeUUID )
458
+ // VM not found, so marking detach as Success and removing finalizer from PVC
459
+ volumesThatFailedToDetach = removeFinalizerAndStatusEntry (ctx , r .client , k8sClient ,
460
+ instance , pvc , volumesThatFailedToDetach )
422
461
} else {
423
462
log .Errorf ("failed to detach volume %s from VM %s. Fault: %s Err: %s" ,
424
463
pvc , instance .Spec .NodeUUID , faulttype , detachErr )
@@ -427,10 +466,9 @@ func (r *Reconciler) detachVolumes(ctx context.Context,
427
466
volumesThatFailedToDetach = append (volumesThatFailedToDetach , pvc )
428
467
}
429
468
} else {
430
- // TODO: remove PVC finalizer
431
- // Remove entry of this volume from the instance's status.
432
- deleteVolumeFromStatus (pvc , instance )
433
- log .Infof ("Successfully detached volume %s from VM %s" , pvc , instance .Spec .NodeUUID )
469
+ // Remove finalizer from the PVC as the detach was successful.
470
+ volumesThatFailedToDetach = removeFinalizerAndStatusEntry (ctx , r .client , k8sClient ,
471
+ instance , pvc , volumesThatFailedToDetach )
434
472
}
435
473
log .Infof ("Detach call ended for PVC %s in namespace %s for instance %s" ,
436
474
pvc , instance .Namespace , instance .Name )
@@ -440,9 +478,31 @@ func (r *Reconciler) detachVolumes(ctx context.Context,
440
478
return volumesThatFailedToDetach
441
479
}
442
480
481
+ // removeFinalizerAndStatusEntry removes finalizer from the given PVC and
482
+ // removes its entry from the instance status if it is successful.
483
+ // If removing the finalizer fails, it adds the volume to volumesThatFailedToDetach list.
484
+ func removeFinalizerAndStatusEntry (ctx context.Context , client client.Client , k8sClient kubernetes.Interface ,
485
+ instance * v1alpha1.CnsNodeVmBatchAttachment , pvc string ,
486
+ volumesThatFailedToDetach []string ) []string {
487
+ log := logger .GetLogger (ctx )
488
+
489
+ err := removePvcFinalizer (ctx , client , k8sClient , pvc , instance .Namespace , instance .Spec .NodeUUID )
490
+ if err != nil {
491
+ log .Errorf ("failed to remove finalizer from PVC %s. Err: %s" , pvc , err )
492
+ updateInstanceWithErrorForPvc (instance , pvc , err .Error ())
493
+ volumesThatFailedToDetach = append (volumesThatFailedToDetach , pvc )
494
+ } else {
495
+ // Remove entry of this volume from the instance's status.
496
+ deleteVolumeFromStatus (pvc , instance )
497
+ log .Infof ("Successfully detached volume %s from VM %s" , pvc , instance .Spec .NodeUUID )
498
+ }
499
+ return volumesThatFailedToDetach
500
+ }
501
+
443
502
// processBatchAttach first constructs the batch attach volume request for all volumes in instance spec
444
503
// and then calls CNS batch attach for them.
445
- func (r * Reconciler ) processBatchAttach (ctx context.Context , vm * cnsvsphere.VirtualMachine ,
504
+ func (r * Reconciler ) processBatchAttach (ctx context.Context , k8sClient kubernetes.Interface ,
505
+ vm * cnsvsphere.VirtualMachine ,
446
506
instance * v1alpha1.CnsNodeVmBatchAttachment ) error {
447
507
log := logger .GetLogger (ctx )
448
508
@@ -474,6 +534,17 @@ func (r *Reconciler) processBatchAttach(ctx context.Context, vm *cnsvsphere.Virt
474
534
return fmt .Errorf ("failed to get volumeName for pvc %s" , pvcName )
475
535
476
536
}
537
+
538
+ // If attach was successful, add finalizer to the PVC.
539
+ if result .Error == nil {
540
+ // Add finalizer on PVC as attach was successful.
541
+ err = addPvcFinalizer (ctx , r .client , k8sClient , pvcName , instance .Namespace , instance .Spec .NodeUUID )
542
+ if err != nil {
543
+ log .Errorf ("failed to add finalizer %s on PVC %s" , cnsoperatortypes .CNSPvcFinalizer , pvcName )
544
+ result .Error = err
545
+ attachErr = err
546
+ }
547
+ }
477
548
// Update instance with attach result
478
549
updateInstanceWithAttachVolumeResult (instance , volumeName , pvcName , result )
479
550
}
0 commit comments