@@ -50,7 +50,7 @@ import (
50
50
// if it has volumes. It also verifies that the pods in the desired state of the
51
51
// world cache still exist, if not, it removes them.
52
52
type DesiredStateOfWorldPopulator interface {
53
- Run (sourcesReady config. SourcesReady , stopCh <- chan struct {} )
53
+ Run (ctx context. Context , sourcesReady config. SourcesReady )
54
54
55
55
// ReprocessPod sets value for the specified pod in processedPods
56
56
// to false, forcing it to be reprocessed. This is required to enable
@@ -141,21 +141,22 @@ type processedPods struct {
141
141
sync.RWMutex
142
142
}
143
143
144
- func (dswp * desiredStateOfWorldPopulator ) Run (sourcesReady config. SourcesReady , stopCh <- chan struct {} ) {
144
+ func (dswp * desiredStateOfWorldPopulator ) Run (ctx context. Context , sourcesReady config. SourcesReady ) {
145
145
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
146
- klog .InfoS ("Desired state populator starts to run" )
147
- wait .PollUntil (dswp .loopSleepDuration , func () (bool , error ) {
146
+ logger := klog .FromContext (ctx )
147
+ logger .Info ("Desired state populator starts to run" )
148
+ _ = wait .PollUntilContextCancel (ctx , dswp .loopSleepDuration , false , func (ctx context.Context ) (bool , error ) {
148
149
done := sourcesReady .AllReady ()
149
- dswp .populatorLoop ()
150
+ dswp .populatorLoop (ctx )
150
151
return done , nil
151
- }, stopCh )
152
+ })
152
153
dswp .hasAddedPodsLock .Lock ()
153
154
if ! dswp .hasAddedPods {
154
- klog . InfoS ("Finished populating initial desired state of world" )
155
+ logger . Info ("Finished populating initial desired state of world" )
155
156
dswp .hasAddedPods = true
156
157
}
157
158
dswp .hasAddedPodsLock .Unlock ()
158
- wait .Until ( dswp .populatorLoop , dswp .loopSleepDuration , stopCh )
159
+ wait .UntilWithContext ( ctx , dswp .populatorLoop , dswp .loopSleepDuration )
159
160
}
160
161
161
162
func (dswp * desiredStateOfWorldPopulator ) ReprocessPod (
@@ -169,14 +170,14 @@ func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool {
169
170
return dswp .hasAddedPods
170
171
}
171
172
172
- func (dswp * desiredStateOfWorldPopulator ) populatorLoop () {
173
- dswp .findAndAddNewPods ()
173
+ func (dswp * desiredStateOfWorldPopulator ) populatorLoop (ctx context. Context ) {
174
+ dswp .findAndAddNewPods (ctx )
174
175
dswp .findAndRemoveDeletedPods ()
175
176
}
176
177
177
178
// Iterate through all pods and add to desired state of world if they don't
178
179
// exist but should
179
- func (dswp * desiredStateOfWorldPopulator ) findAndAddNewPods () {
180
+ func (dswp * desiredStateOfWorldPopulator ) findAndAddNewPods (ctx context. Context ) {
180
181
// Map unique pod name to outer volume name to MountedVolume.
181
182
mountedVolumesForPod := make (map [volumetypes.UniquePodName ]map [string ]cache.MountedVolume )
182
183
for _ , mountedVolume := range dswp .actualStateOfWorld .GetMountedVolumes () {
@@ -201,7 +202,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
201
202
continue
202
203
}
203
204
204
- dswp .processPodVolumes (pod , mountedVolumesForPod )
205
+ dswp .processPodVolumes (ctx , pod , mountedVolumesForPod )
205
206
}
206
207
}
207
208
@@ -284,12 +285,14 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
284
285
// processPodVolumes processes the volumes in the given pod and adds them to the
285
286
// desired state of the world.
286
287
func (dswp * desiredStateOfWorldPopulator ) processPodVolumes (
288
+ ctx context.Context ,
287
289
pod * v1.Pod ,
288
290
mountedVolumesForPod map [volumetypes.UniquePodName ]map [string ]cache.MountedVolume ) {
289
291
if pod == nil {
290
292
return
291
293
}
292
294
295
+ logger := klog .FromContext (ctx )
293
296
uniquePodName := util .GetUniquePodName (pod )
294
297
if dswp .podPreviouslyProcessed (uniquePodName ) {
295
298
return
@@ -302,14 +305,14 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
302
305
for _ , podVolume := range pod .Spec .Volumes {
303
306
if ! mounts .Has (podVolume .Name ) && ! devices .Has (podVolume .Name ) {
304
307
// Volume is not used in the pod, ignore it.
305
- klog .V (4 ).InfoS ("Skipping unused volume" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name )
308
+ logger .V (4 ).Info ("Skipping unused volume" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name )
306
309
continue
307
310
}
308
311
309
312
pvc , volumeSpec , volumeGidValue , err :=
310
- dswp .createVolumeSpec (podVolume , pod , mounts , devices )
313
+ dswp .createVolumeSpec (logger , podVolume , pod , mounts , devices )
311
314
if err != nil {
312
- klog . ErrorS (err , "Error processing volume" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name )
315
+ logger . Error (err , "Error processing volume" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name )
313
316
dswp .desiredStateOfWorld .AddErrorToPod (uniquePodName , err .Error ())
314
317
allVolumesAdded = false
315
318
continue
@@ -319,11 +322,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
319
322
_ , err = dswp .desiredStateOfWorld .AddPodToVolume (
320
323
uniquePodName , pod , volumeSpec , podVolume .Name , volumeGidValue , seLinuxContainerContexts [podVolume .Name ])
321
324
if err != nil {
322
- klog . ErrorS (err , "Failed to add volume to desiredStateOfWorld" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name , "volumeSpecName" , volumeSpec .Name ())
325
+ logger . Error (err , "Failed to add volume to desiredStateOfWorld" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name , "volumeSpecName" , volumeSpec .Name ())
323
326
dswp .desiredStateOfWorld .AddErrorToPod (uniquePodName , err .Error ())
324
327
allVolumesAdded = false
325
328
} else {
326
- klog .V (4 ).InfoS ("Added volume to desired state" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name , "volumeSpecName" , volumeSpec .Name ())
329
+ logger .V (4 ).Info ("Added volume to desired state" , "pod" , klog .KObj (pod ), "volumeName" , podVolume .Name , "volumeSpecName" , volumeSpec .Name ())
327
330
}
328
331
329
332
dswp .checkVolumeFSResize (pod , podVolume , pvc , volumeSpec , uniquePodName , mountedVolumesForPod )
@@ -456,7 +459,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
456
459
// specified volume. It dereference any PVC to get PV objects, if needed.
457
460
// Returns an error if unable to obtain the volume at this time.
458
461
func (dswp * desiredStateOfWorldPopulator ) createVolumeSpec (
459
- podVolume v1.Volume , pod * v1.Pod , mounts , devices sets.Set [string ]) (* v1.PersistentVolumeClaim , * volume.Spec , string , error ) {
462
+ logger klog. Logger , podVolume v1.Volume , pod * v1.Pod , mounts , devices sets.Set [string ]) (* v1.PersistentVolumeClaim , * volume.Spec , string , error ) {
460
463
pvcSource := podVolume .VolumeSource .PersistentVolumeClaim
461
464
isEphemeral := pvcSource == nil && podVolume .VolumeSource .Ephemeral != nil
462
465
if isEphemeral {
@@ -469,7 +472,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
469
472
}
470
473
}
471
474
if pvcSource != nil {
472
- klog .V (5 ).InfoS ("Found PVC" , "PVC" , klog .KRef (pod .Namespace , pvcSource .ClaimName ))
475
+ logger .V (5 ).Info ("Found PVC" , "PVC" , klog .KRef (pod .Namespace , pvcSource .ClaimName ))
473
476
// If podVolume is a PVC, fetch the real PV behind the claim
474
477
pvc , err := dswp .getPVCExtractPV (
475
478
pod .Namespace , pvcSource .ClaimName )
@@ -486,7 +489,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
486
489
}
487
490
}
488
491
pvName , pvcUID := pvc .Spec .VolumeName , pvc .UID
489
- klog .V (5 ).InfoS ("Found bound PV for PVC" , "PVC" , klog .KRef (pod .Namespace , pvcSource .ClaimName ), "PVCUID" , pvcUID , "PVName" , pvName )
492
+ logger .V (5 ).Info ("Found bound PV for PVC" , "PVC" , klog .KRef (pod .Namespace , pvcSource .ClaimName ), "PVCUID" , pvcUID , "PVName" , pvName )
490
493
// Fetch actual PV object
491
494
volumeSpec , volumeGidValue , err :=
492
495
dswp .getPVSpec (pvName , pvcSource .ReadOnly , pvcUID )
@@ -497,13 +500,13 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
497
500
pvcSource .ClaimName ,
498
501
err )
499
502
}
500
- klog .V (5 ).InfoS ("Extracted volumeSpec from bound PV and PVC" , "PVC" , klog .KRef (pod .Namespace , pvcSource .ClaimName ), "PVCUID" , pvcUID , "PVName" , pvName , "volumeSpecName" , volumeSpec .Name ())
503
+ logger .V (5 ).Info ("Extracted volumeSpec from bound PV and PVC" , "PVC" , klog .KRef (pod .Namespace , pvcSource .ClaimName ), "PVCUID" , pvcUID , "PVName" , pvName , "volumeSpecName" , volumeSpec .Name ())
501
504
migratable , err := dswp .csiMigratedPluginManager .IsMigratable (volumeSpec )
502
505
if err != nil {
503
506
return nil , nil , "" , err
504
507
}
505
508
if migratable {
506
- volumeSpec , err = csimigration .TranslateInTreeSpecToCSI (volumeSpec , pod .Namespace , dswp .intreeToCSITranslator )
509
+ volumeSpec , err = csimigration .TranslateInTreeSpecToCSI (logger , volumeSpec , pod .Namespace , dswp .intreeToCSITranslator )
507
510
if err != nil {
508
511
return nil , nil , "" , err
509
512
}
@@ -539,7 +542,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
539
542
return nil , nil , "" , err
540
543
}
541
544
if migratable {
542
- spec , err = csimigration .TranslateInTreeSpecToCSI (spec , pod .Namespace , dswp .intreeToCSITranslator )
545
+ spec , err = csimigration .TranslateInTreeSpecToCSI (logger , spec , pod .Namespace , dswp .intreeToCSITranslator )
543
546
if err != nil {
544
547
return nil , nil , "" , err
545
548
}
0 commit comments