Skip to content

Commit 1298c8a

Browse files
committed
csi-translation-lib: Support structured and contextual logging
1 parent 8ba158c commit 1298c8a

36 files changed

+257
-185
lines changed

hack/golangci-hints.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ linters-settings: # please keep this alphabetized
136136
contextual k8s.io/component-helpers/.*
137137
contextual k8s.io/cri-api/.*
138138
contextual k8s.io/cri-client/.*
139+
contextual k8s.io/csi-translation-lib/.*
139140
contextual k8s.io/dynamic-resource-allocation/.*
140141
contextual k8s.io/endpointslice/.*
141142
contextual k8s.io/kms/.*

hack/golangci-strict.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ linters-settings: # please keep this alphabetized
182182
contextual k8s.io/component-helpers/.*
183183
contextual k8s.io/cri-api/.*
184184
contextual k8s.io/cri-client/.*
185+
contextual k8s.io/csi-translation-lib/.*
185186
contextual k8s.io/dynamic-resource-allocation/.*
186187
contextual k8s.io/endpointslice/.*
187188
contextual k8s.io/kms/.*

hack/golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ linters-settings: # please keep this alphabetized
185185
contextual k8s.io/component-helpers/.*
186186
contextual k8s.io/cri-api/.*
187187
contextual k8s.io/cri-client/.*
188+
contextual k8s.io/csi-translation-lib/.*
188189
contextual k8s.io/dynamic-resource-allocation/.*
189190
contextual k8s.io/endpointslice/.*
190191
contextual k8s.io/kms/.*

hack/logcheck.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ contextual k8s.io/client-go/tools/record/.*
3232
contextual k8s.io/component-helpers/.*
3333
contextual k8s.io/cri-api/.*
3434
contextual k8s.io/cri-client/.*
35+
contextual k8s.io/csi-translation-lib/.*
3536
contextual k8s.io/dynamic-resource-allocation/.*
3637
contextual k8s.io/endpointslice/.*
3738
contextual k8s.io/kms/.*

pkg/controller/volume/attachdetach/attach_detach_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,7 @@ func (adc *attachDetachController) processVolumeAttachments(logger klog.Logger)
711711
// PV is migrated and should be handled by the CSI plugin instead of the in-tree one
712712
plugin, _ = adc.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
713713
// podNamespace is not needed here for Azurefile as the volumeName generated will be the same with or without podNamespace
714-
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator)
714+
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(logger, volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator)
715715
if err != nil {
716716
logger.Error(err, "Failed to translate intree volumeSpec to CSI volumeSpec for volume", "node", klog.KRef("", string(nodeName)), "inTreePluginName", inTreePluginName, "vaName", va.Name, "PV", klog.KRef("", *pvName))
717717
continue

pkg/controller/volume/attachdetach/util/util.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func CreateVolumeSpec(logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, node
8181
err)
8282
}
8383

84-
volumeSpec, err = translateInTreeSpecToCSIIfNeeded(volumeSpec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
84+
volumeSpec, err = translateInTreeSpecToCSIIfNeeded(logger, volumeSpec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
8585
if err != nil {
8686
return nil, fmt.Errorf(
8787
"error performing CSI migration checks and translation for PVC %q/%q: %v",
@@ -100,7 +100,7 @@ func CreateVolumeSpec(logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, node
100100
clonedPodVolume := podVolume.DeepCopy()
101101

102102
origspec := volume.NewSpecFromVolume(clonedPodVolume)
103-
spec, err := translateInTreeSpecToCSIIfNeeded(origspec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
103+
spec, err := translateInTreeSpecToCSIIfNeeded(logger, origspec, nodeName, vpm, csiMigratedPluginManager, csiTranslator, pod.Namespace)
104104
if err != nil {
105105
return nil, fmt.Errorf(
106106
"error performing CSI migration checks and translation for inline volume %q: %v",
@@ -241,7 +241,7 @@ func ProcessPodVolumes(logger klog.Logger, pod *v1.Pod, addVolumes bool, desired
241241
return
242242
}
243243

244-
func translateInTreeSpecToCSIIfNeeded(spec *volume.Spec, nodeName types.NodeName, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator, podNamespace string) (*volume.Spec, error) {
244+
func translateInTreeSpecToCSIIfNeeded(logger klog.Logger, spec *volume.Spec, nodeName types.NodeName, vpm *volume.VolumePluginMgr, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator, podNamespace string) (*volume.Spec, error) {
245245
translatedSpec := spec
246246
migratable, err := csiMigratedPluginManager.IsMigratable(spec)
247247
if err != nil {
@@ -256,7 +256,7 @@ func translateInTreeSpecToCSIIfNeeded(spec *volume.Spec, nodeName types.NodeName
256256
return nil, err
257257
}
258258
if migratable && migrationSupportedOnNode {
259-
translatedSpec, err = csimigration.TranslateInTreeSpecToCSI(spec, podNamespace, csiTranslator)
259+
translatedSpec, err = csimigration.TranslateInTreeSpecToCSI(logger, spec, podNamespace, csiTranslator)
260260
if err != nil {
261261
return nil, err
262262
}

pkg/kubelet/kubelet.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1648,7 +1648,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
16481648
kl.warnCgroupV1Usage()
16491649

16501650
// Start volume manager
1651-
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
1651+
go kl.volumeManager.Run(ctx, kl.sourcesReady)
16521652

16531653
if kl.kubeClient != nil {
16541654
// Start two go-routines to update the status.

pkg/kubelet/kubelet_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2997,12 +2997,6 @@ func simulateVolumeInUseUpdate(
29972997
}
29982998
}
29992999

3000-
func runVolumeManager(kubelet *Kubelet) chan struct{} {
3001-
stopCh := make(chan struct{})
3002-
go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh)
3003-
return stopCh
3004-
}
3005-
30063000
// dirExists returns true if the path exists and represents a directory.
30073001
func dirExists(path string) bool {
30083002
s, err := os.Stat(path)

pkg/kubelet/kubelet_volumes_test.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/kubernetes/pkg/volume"
3232
volumetest "k8s.io/kubernetes/pkg/volume/testing"
3333
"k8s.io/kubernetes/pkg/volume/util"
34+
"k8s.io/kubernetes/test/utils/ktesting"
3435
)
3536

3637
func TestListVolumesForPod(t *testing.T) {
@@ -78,8 +79,9 @@ func TestListVolumesForPod(t *testing.T) {
7879
},
7980
})
8081

81-
stopCh := runVolumeManager(kubelet)
82-
defer close(stopCh)
82+
tCtx := ktesting.Init(t)
83+
defer tCtx.Cancel("test has completed")
84+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
8385

8486
kubelet.podManager.SetPods([]*v1.Pod{pod})
8587
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
@@ -196,8 +198,9 @@ func TestPodVolumesExist(t *testing.T) {
196198
},
197199
}
198200

199-
stopCh := runVolumeManager(kubelet)
200-
defer close(stopCh)
201+
tCtx := ktesting.Init(t)
202+
defer tCtx.Cancel("test has completed")
203+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
201204

202205
kubelet.podManager.SetPods(pods)
203206
for _, pod := range pods {
@@ -255,8 +258,9 @@ func TestPodVolumeDeadlineAttachAndMount(t *testing.T) {
255258
},
256259
}
257260

258-
stopCh := runVolumeManager(kubelet)
259-
defer close(stopCh)
261+
tCtx := ktesting.Init(t)
262+
defer tCtx.Cancel("test has completed")
263+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
260264

261265
kubelet.podManager.SetPods(pods)
262266
for _, pod := range pods {
@@ -316,8 +320,9 @@ func TestPodVolumeDeadlineUnmount(t *testing.T) {
316320
},
317321
}
318322

319-
stopCh := runVolumeManager(kubelet)
320-
defer close(stopCh)
323+
tCtx := ktesting.Init(t)
324+
defer tCtx.Cancel("test has completed")
325+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
321326

322327
kubelet.podManager.SetPods(pods)
323328
for i, pod := range pods {
@@ -369,8 +374,9 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
369374
},
370375
})
371376

372-
stopCh := runVolumeManager(kubelet)
373-
defer close(stopCh)
377+
tCtx := ktesting.Init(t)
378+
defer tCtx.Cancel("test has completed")
379+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
374380

375381
kubelet.podManager.SetPods([]*v1.Pod{pod})
376382
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
@@ -428,8 +434,9 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
428434
},
429435
})
430436

431-
stopCh := runVolumeManager(kubelet)
432-
defer close(stopCh)
437+
tCtx := ktesting.Init(t)
438+
defer tCtx.Cancel("test has completed")
439+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
433440

434441
// Add pod
435442
kubelet.podManager.SetPods([]*v1.Pod{pod})
@@ -534,15 +541,16 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
534541
},
535542
})
536543

537-
stopCh := runVolumeManager(kubelet)
538-
defer close(stopCh)
544+
tCtx := ktesting.Init(t)
545+
defer tCtx.Cancel("test has completed")
546+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
539547

540548
kubelet.podManager.SetPods([]*v1.Pod{pod})
541549

542550
// Fake node status update
543551
go simulateVolumeInUseUpdate(
544552
v1.UniqueVolumeName("fake/fake-device"),
545-
stopCh,
553+
tCtx.Done(),
546554
kubelet.volumeManager)
547555

548556
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod))
@@ -618,16 +626,17 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
618626
},
619627
})
620628

621-
stopCh := runVolumeManager(kubelet)
622-
defer close(stopCh)
629+
tCtx := ktesting.Init(t)
630+
defer tCtx.Cancel("test has completed")
631+
go kubelet.volumeManager.Run(tCtx, kubelet.sourcesReady)
623632

624633
// Add pod
625634
kubelet.podManager.SetPods([]*v1.Pod{pod})
626635

627636
// Fake node status update
628637
go simulateVolumeInUseUpdate(
629638
v1.UniqueVolumeName("fake/fake-device"),
630-
stopCh,
639+
tCtx.Done(),
631640
kubelet.volumeManager)
632641

633642
// Verify volumes attached

pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import (
5050
// if it has volumes. It also verifies that the pods in the desired state of the
5151
// world cache still exist, if not, it removes them.
5252
type DesiredStateOfWorldPopulator interface {
53-
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
53+
Run(ctx context.Context, sourcesReady config.SourcesReady)
5454

5555
// ReprocessPod sets value for the specified pod in processedPods
5656
// to false, forcing it to be reprocessed. This is required to enable
@@ -141,21 +141,22 @@ type processedPods struct {
141141
sync.RWMutex
142142
}
143143

144-
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
144+
func (dswp *desiredStateOfWorldPopulator) Run(ctx context.Context, sourcesReady config.SourcesReady) {
145145
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
146-
klog.InfoS("Desired state populator starts to run")
147-
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
146+
logger := klog.FromContext(ctx)
147+
logger.Info("Desired state populator starts to run")
148+
_ = wait.PollUntilContextCancel(ctx, dswp.loopSleepDuration, false, func(ctx context.Context) (bool, error) {
148149
done := sourcesReady.AllReady()
149-
dswp.populatorLoop()
150+
dswp.populatorLoop(ctx)
150151
return done, nil
151-
}, stopCh)
152+
})
152153
dswp.hasAddedPodsLock.Lock()
153154
if !dswp.hasAddedPods {
154-
klog.InfoS("Finished populating initial desired state of world")
155+
logger.Info("Finished populating initial desired state of world")
155156
dswp.hasAddedPods = true
156157
}
157158
dswp.hasAddedPodsLock.Unlock()
158-
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
159+
wait.UntilWithContext(ctx, dswp.populatorLoop, dswp.loopSleepDuration)
159160
}
160161

161162
func (dswp *desiredStateOfWorldPopulator) ReprocessPod(
@@ -169,14 +170,14 @@ func (dswp *desiredStateOfWorldPopulator) HasAddedPods() bool {
169170
return dswp.hasAddedPods
170171
}
171172

172-
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
173-
dswp.findAndAddNewPods()
173+
func (dswp *desiredStateOfWorldPopulator) populatorLoop(ctx context.Context) {
174+
dswp.findAndAddNewPods(ctx)
174175
dswp.findAndRemoveDeletedPods()
175176
}
176177

177178
// Iterate through all pods and add to desired state of world if they don't
178179
// exist but should
179-
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
180+
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods(ctx context.Context) {
180181
// Map unique pod name to outer volume name to MountedVolume.
181182
mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
182183
for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
@@ -201,7 +202,7 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
201202
continue
202203
}
203204

204-
dswp.processPodVolumes(pod, mountedVolumesForPod)
205+
dswp.processPodVolumes(ctx, pod, mountedVolumesForPod)
205206
}
206207
}
207208

@@ -284,12 +285,14 @@ func (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {
284285
// processPodVolumes processes the volumes in the given pod and adds them to the
285286
// desired state of the world.
286287
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
288+
ctx context.Context,
287289
pod *v1.Pod,
288290
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume) {
289291
if pod == nil {
290292
return
291293
}
292294

295+
logger := klog.FromContext(ctx)
293296
uniquePodName := util.GetUniquePodName(pod)
294297
if dswp.podPreviouslyProcessed(uniquePodName) {
295298
return
@@ -302,14 +305,14 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
302305
for _, podVolume := range pod.Spec.Volumes {
303306
if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) {
304307
// Volume is not used in the pod, ignore it.
305-
klog.V(4).InfoS("Skipping unused volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
308+
logger.V(4).Info("Skipping unused volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
306309
continue
307310
}
308311

309312
pvc, volumeSpec, volumeGidValue, err :=
310-
dswp.createVolumeSpec(podVolume, pod, mounts, devices)
313+
dswp.createVolumeSpec(logger, podVolume, pod, mounts, devices)
311314
if err != nil {
312-
klog.ErrorS(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
315+
logger.Error(err, "Error processing volume", "pod", klog.KObj(pod), "volumeName", podVolume.Name)
313316
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
314317
allVolumesAdded = false
315318
continue
@@ -319,11 +322,11 @@ func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
319322
_, err = dswp.desiredStateOfWorld.AddPodToVolume(
320323
uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue, seLinuxContainerContexts[podVolume.Name])
321324
if err != nil {
322-
klog.ErrorS(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
325+
logger.Error(err, "Failed to add volume to desiredStateOfWorld", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
323326
dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error())
324327
allVolumesAdded = false
325328
} else {
326-
klog.V(4).InfoS("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
329+
logger.V(4).Info("Added volume to desired state", "pod", klog.KObj(pod), "volumeName", podVolume.Name, "volumeSpecName", volumeSpec.Name())
327330
}
328331

329332
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod)
@@ -455,7 +458,7 @@ func (dswp *desiredStateOfWorldPopulator) deleteProcessedPod(
455458
// specified volume. It dereference any PVC to get PV objects, if needed.
456459
// Returns an error if unable to obtain the volume at this time.
457460
func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
458-
podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
461+
logger klog.Logger, podVolume v1.Volume, pod *v1.Pod, mounts, devices sets.Set[string]) (*v1.PersistentVolumeClaim, *volume.Spec, string, error) {
459462
pvcSource := podVolume.VolumeSource.PersistentVolumeClaim
460463
isEphemeral := pvcSource == nil && podVolume.VolumeSource.Ephemeral != nil
461464
if isEphemeral {
@@ -468,7 +471,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
468471
}
469472
}
470473
if pvcSource != nil {
471-
klog.V(5).InfoS("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName))
474+
logger.V(5).Info("Found PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName))
472475
// If podVolume is a PVC, fetch the real PV behind the claim
473476
pvc, err := dswp.getPVCExtractPV(
474477
pod.Namespace, pvcSource.ClaimName)
@@ -485,7 +488,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
485488
}
486489
}
487490
pvName, pvcUID := pvc.Spec.VolumeName, pvc.UID
488-
klog.V(5).InfoS("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName)
491+
logger.V(5).Info("Found bound PV for PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName)
489492
// Fetch actual PV object
490493
volumeSpec, volumeGidValue, err :=
491494
dswp.getPVSpec(pvName, pvcSource.ReadOnly, pvcUID)
@@ -496,13 +499,13 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
496499
pvcSource.ClaimName,
497500
err)
498501
}
499-
klog.V(5).InfoS("Extracted volumeSpec from bound PV and PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName, "volumeSpecName", volumeSpec.Name())
502+
logger.V(5).Info("Extracted volumeSpec from bound PV and PVC", "PVC", klog.KRef(pod.Namespace, pvcSource.ClaimName), "PVCUID", pvcUID, "PVName", pvName, "volumeSpecName", volumeSpec.Name())
500503
migratable, err := dswp.csiMigratedPluginManager.IsMigratable(volumeSpec)
501504
if err != nil {
502505
return nil, nil, "", err
503506
}
504507
if migratable {
505-
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, dswp.intreeToCSITranslator)
508+
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(logger, volumeSpec, pod.Namespace, dswp.intreeToCSITranslator)
506509
if err != nil {
507510
return nil, nil, "", err
508511
}
@@ -538,7 +541,7 @@ func (dswp *desiredStateOfWorldPopulator) createVolumeSpec(
538541
return nil, nil, "", err
539542
}
540543
if migratable {
541-
spec, err = csimigration.TranslateInTreeSpecToCSI(spec, pod.Namespace, dswp.intreeToCSITranslator)
544+
spec, err = csimigration.TranslateInTreeSpecToCSI(logger, spec, pod.Namespace, dswp.intreeToCSITranslator)
542545
if err != nil {
543546
return nil, nil, "", err
544547
}

0 commit comments

Comments
 (0)