Skip to content

Commit e2d8e57

Browse files
committed
Add CSI block volume directory cleanup
CSI volume plugin creates number of files/directories when processing block volumes. These files must be cleaned when the plugin is done with the volume, i.e. at the end on TearDownDevice().
1 parent 8ca96f3 commit e2d8e57

File tree

4 files changed

+328
-30
lines changed

4 files changed

+328
-30
lines changed

pkg/volume/csi/csi_block.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ import (
7272
"os"
7373
"path/filepath"
7474

75+
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
76+
7577
"k8s.io/klog"
7678

77-
"k8s.io/api/core/v1"
79+
v1 "k8s.io/api/core/v1"
7880
storage "k8s.io/api/storage/v1"
7981
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
8082
"k8s.io/apimachinery/pkg/types"
@@ -299,6 +301,13 @@ func (m *csiBlockMapper) SetUpDevice() error {
299301
// Call NodeStageVolume
300302
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
301303
if err != nil {
304+
if volumetypes.IsOperationFinishedError(err) {
305+
cleanupErr := m.cleanupOrphanDeviceFiles()
306+
if cleanupErr != nil {
307+
// V(4) for not so serious error
308+
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
309+
}
310+
}
302311
return err
303312
}
304313

@@ -435,6 +444,57 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
435444
return err
436445
}
437446
}
447+
if err = m.cleanupOrphanDeviceFiles(); err != nil {
448+
return err
449+
}
450+
451+
return nil
452+
}
453+
454+
// Clean up any orphan files / directories when a block volume is being unstaged.
455+
// At this point we can be sure that there is no pod using the volume and all
456+
// files are indeed orphaned.
457+
func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
458+
// Remove artifacts of NodePublish.
459+
// publishPath: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/<volume name>/<pod UUID>
460+
// publishPath was removed by the driver. We need to remove the <volume name>/ dir.
461+
publishPath := m.getPublishPath()
462+
publishDir := filepath.Dir(publishPath)
463+
if m.podUID == "" {
464+
// Pod UID is not known during device teardown ("NodeUnstage").
465+
// getPublishPath() squashed "<volume name>/<pod UUID>" into "<volume name>/".
466+
publishDir = publishPath
467+
}
468+
if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) {
469+
return errors.New(log("failed to publish directory [%s]: %v", publishDir, err))
470+
}
471+
472+
// Remove artifacts of NodeStage.
473+
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
474+
stagingPath := m.getStagingPath()
475+
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
476+
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
477+
}
478+
479+
// Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/<volume name>.
480+
// At this point it contains only "data/vol_data.json" and empty "dev/".
481+
dataDir := getVolumeDeviceDataDir(m.specName, m.plugin.host)
482+
dataFile := filepath.Join(dataDir, volDataFileName)
483+
if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
484+
return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err))
485+
}
486+
if err := os.Remove(dataDir); err != nil && !os.IsNotExist(err) {
487+
return errors.New(log("failed to delete volume data directory [%s]: %v", dataDir, err))
488+
}
489+
490+
volumeDir := filepath.Dir(dataDir)
491+
deviceDir := filepath.Join(volumeDir, "dev")
492+
if err := os.Remove(deviceDir); err != nil && !os.IsNotExist(err) {
493+
return errors.New(log("failed to delete volume directory [%s]: %v", deviceDir, err))
494+
}
495+
if err := os.Remove(volumeDir); err != nil && !os.IsNotExist(err) {
496+
return errors.New(log("failed to delete volume directory [%s]: %v", volumeDir, err))
497+
}
438498

439499
return nil
440500
}

pkg/volume/csi/csi_block_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package csi
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
2324
"path/filepath"
@@ -282,6 +283,54 @@ func TestBlockMapperSetupDevice(t *testing.T) {
282283
}
283284
}
284285

286+
func TestBlockMapperSetupDeviceError(t *testing.T) {
287+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
288+
289+
plug, tmpDir := newTestPlugin(t, nil)
290+
defer os.RemoveAll(tmpDir)
291+
292+
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
293+
if err != nil {
294+
t.Fatalf("Failed to make a new Mapper: %v", err)
295+
}
296+
297+
pvName := pv.GetName()
298+
nodeName := string(plug.host.GetNodeName())
299+
300+
csiMapper.csiClient = setupClient(t, true)
301+
fClient := csiMapper.csiClient.(*fakeCsiDriverClient)
302+
fClient.nodeClient.SetNextError(errors.New("mock final error"))
303+
304+
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
305+
attachment := makeTestAttachment(attachID, nodeName, pvName)
306+
attachment.Status.Attached = true
307+
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
308+
if err != nil {
309+
t.Fatalf("failed to setup VolumeAttachment: %v", err)
310+
}
311+
t.Log("created attachement ", attachID)
312+
313+
err = csiMapper.SetUpDevice()
314+
if err == nil {
315+
t.Fatal("mapper unexpectedly succeeded")
316+
}
317+
318+
// Check that all directories have been cleaned
319+
// Check that all metadata / staging / publish directories were deleted
320+
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
321+
if _, err := os.Stat(dataDir); err == nil {
322+
t.Errorf("volume publish data directory %s was not deleted", dataDir)
323+
}
324+
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
325+
if _, err := os.Stat(devDir); err == nil {
326+
t.Errorf("volume publish device directory %s was not deleted", devDir)
327+
}
328+
stagingPath := csiMapper.getStagingPath()
329+
if _, err := os.Stat(stagingPath); err == nil {
330+
t.Errorf("volume staging path %s was not deleted", stagingPath)
331+
}
332+
}
333+
285334
func TestBlockMapperMapPodDevice(t *testing.T) {
286335
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
287336

@@ -430,3 +479,124 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
430479
t.Error("csi server may not have received NodeUnstageVolume call")
431480
}
432481
}
482+
483+
func TestVolumeSetupTeardown(t *testing.T) {
484+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
485+
486+
plug, tmpDir := newTestPlugin(t, nil)
487+
defer os.RemoveAll(tmpDir)
488+
489+
csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
490+
if err != nil {
491+
t.Fatalf("Failed to make a new Mapper: %v", err)
492+
}
493+
494+
pvName := pv.GetName()
495+
nodeName := string(plug.host.GetNodeName())
496+
497+
csiMapper.csiClient = setupClient(t, true)
498+
499+
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
500+
attachment := makeTestAttachment(attachID, nodeName, pvName)
501+
attachment.Status.Attached = true
502+
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
503+
if err != nil {
504+
t.Fatalf("failed to setup VolumeAttachment: %v", err)
505+
}
506+
t.Log("created attachement ", attachID)
507+
508+
// SetupDevice
509+
err = csiMapper.SetUpDevice()
510+
if err != nil {
511+
t.Fatalf("mapper failed to SetupDevice: %v", err)
512+
}
513+
// Check if NodeStageVolume staged to the right path
514+
stagingPath := csiMapper.getStagingPath()
515+
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
516+
svol, ok := svols[csiMapper.volumeID]
517+
if !ok {
518+
t.Error("csi server may not have received NodeStageVolume call")
519+
}
520+
if svol.Path != stagingPath {
521+
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
522+
}
523+
524+
// MapPodDevice
525+
path, err := csiMapper.MapPodDevice()
526+
if err != nil {
527+
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
528+
}
529+
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
530+
pvol, ok := pvols[csiMapper.volumeID]
531+
if !ok {
532+
t.Error("csi server may not have received NodePublishVolume call")
533+
}
534+
publishPath := csiMapper.getPublishPath()
535+
if pvol.Path != publishPath {
536+
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
537+
}
538+
if path != publishPath {
539+
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
540+
}
541+
542+
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
543+
if err != nil {
544+
t.Fatalf("failed to make a new Unmapper: %v", err)
545+
}
546+
547+
csiUnmapper := unmapper.(*csiBlockMapper)
548+
csiUnmapper.csiClient = csiMapper.csiClient
549+
550+
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec)
551+
if err != nil {
552+
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
553+
}
554+
555+
// UnmapDevice
556+
err = csiUnmapper.UnmapPodDevice()
557+
if err != nil {
558+
t.Errorf("unmapper failed to call UnmapPodDevice: %v", err)
559+
}
560+
561+
// GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume
562+
unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "")
563+
if err != nil {
564+
t.Fatalf("failed to make a new Unmapper: %v", err)
565+
}
566+
csiUnmapper = unmapper.(*csiBlockMapper)
567+
csiUnmapper.csiClient = csiMapper.csiClient
568+
569+
// TearDownDevice
570+
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test")
571+
if err != nil {
572+
t.Fatal(err)
573+
}
574+
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
575+
if _, ok := pubs[csiUnmapper.volumeID]; ok {
576+
t.Error("csi server may not have received NodeUnpublishVolume call")
577+
}
578+
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
579+
if _, ok := vols[csiUnmapper.volumeID]; ok {
580+
t.Error("csi server may not have received NodeUnstageVolume call")
581+
}
582+
583+
// Check that all metadata / staging / publish directories were deleted
584+
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
585+
if _, err := os.Stat(dataDir); err == nil {
586+
t.Errorf("volume publish data directory %s was not deleted", dataDir)
587+
}
588+
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
589+
if _, err := os.Stat(devDir); err == nil {
590+
t.Errorf("volume publish device directory %s was not deleted", devDir)
591+
}
592+
if _, err := os.Stat(publishPath); err == nil {
593+
t.Errorf("volume publish path %s was not deleted", publishPath)
594+
}
595+
publishDir := filepath.Dir(publishPath)
596+
if _, err := os.Stat(publishDir); err == nil {
597+
t.Errorf("volume publish parent directory %s was not deleted", publishDir)
598+
}
599+
if _, err := os.Stat(stagingPath); err == nil {
600+
t.Errorf("volume staging path %s was not deleted", stagingPath)
601+
}
602+
}

0 commit comments

Comments
 (0)