Skip to content

Commit da9db64

Browse files
authored
Merge pull request kubernetes#87978 from jsafrane/block-csi-test
Add CSI block volume directory cleanup
2 parents 5cf6507 + 073d0b2 commit da9db64

File tree

6 files changed

+331
-40
lines changed

6 files changed

+331
-40
lines changed

pkg/volume/csi/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ go_library(
1717
visibility = ["//visibility:public"],
1818
deps = [
1919
"//pkg/features:go_default_library",
20+
"//pkg/util/removeall:go_default_library",
2021
"//pkg/volume:go_default_library",
2122
"//pkg/volume/csi/nodeinfomanager:go_default_library",
2223
"//pkg/volume/util:go_default_library",

pkg/volume/csi/csi_block.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,15 @@ import (
7272
"os"
7373
"path/filepath"
7474

75-
"k8s.io/klog"
76-
77-
"k8s.io/api/core/v1"
75+
v1 "k8s.io/api/core/v1"
7876
storage "k8s.io/api/storage/v1"
7977
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
8078
"k8s.io/apimachinery/pkg/types"
8179
"k8s.io/client-go/kubernetes"
80+
"k8s.io/klog"
81+
"k8s.io/kubernetes/pkg/util/removeall"
8282
"k8s.io/kubernetes/pkg/volume"
83+
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
8384
utilstrings "k8s.io/utils/strings"
8485
)
8586

@@ -113,10 +114,16 @@ func (m *csiBlockMapper) getStagingPath() string {
113114
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
114115
}
115116

117+
// getPublishDir returns path to a directory, where the volume is published to each pod.
118+
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}
119+
func (m *csiBlockMapper) getPublishDir() string {
120+
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName)
121+
}
122+
116123
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
117124
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID}
118125
func (m *csiBlockMapper) getPublishPath() string {
119-
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID))
126+
return filepath.Join(m.getPublishDir(), string(m.podUID))
120127
}
121128

122129
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
@@ -299,6 +306,13 @@ func (m *csiBlockMapper) SetUpDevice() error {
299306
// Call NodeStageVolume
300307
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
301308
if err != nil {
309+
if volumetypes.IsOperationFinishedError(err) {
310+
cleanupErr := m.cleanupOrphanDeviceFiles()
311+
if cleanupErr != nil {
312+
// V(4) for not so serious error
313+
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
314+
}
315+
}
302316
return err
303317
}
304318

@@ -435,6 +449,41 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
435449
return err
436450
}
437451
}
452+
if err = m.cleanupOrphanDeviceFiles(); err != nil {
453+
// V(4) for not so serious error
454+
klog.V(4).Infof("Failed to clean up block volume directory %s", err)
455+
}
456+
457+
return nil
458+
}
459+
460+
// Clean up any orphan files / directories when a block volume is being unstaged.
461+
// At this point we can be sure that there is no pod using the volume and all
462+
// files are indeed orphaned.
463+
func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
464+
// Remove artifacts of NodePublish.
465+
// publishDir: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/<volume name>
466+
// Each PublishVolume() created a subdirectory there. Since everything should be
467+
// already unpublished at this point, the directory should be empty by now.
468+
publishDir := m.getPublishDir()
469+
if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) {
470+
return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err))
471+
}
472+
473+
// Remove artifacts of NodeStage.
474+
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
475+
stagingPath := m.getStagingPath()
476+
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
477+
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
478+
}
479+
480+
// Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/<volume name>.
481+
// At this point it contains only "data/vol_data.json" and empty "dev/".
482+
volumeDir := getVolumePluginDir(m.specName, m.plugin.host)
483+
mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName())
484+
if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil {
485+
return err
486+
}
438487

439488
return nil
440489
}

pkg/volume/csi/csi_block_test.go

Lines changed: 171 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package csi
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
2324
"path/filepath"
@@ -282,7 +283,7 @@ func TestBlockMapperSetupDevice(t *testing.T) {
282283
}
283284
}
284285

285-
func TestBlockMapperMapPodDevice(t *testing.T) {
286+
func TestBlockMapperSetupDeviceError(t *testing.T) {
286287
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
287288

288289
plug, tmpDir := newTestPlugin(t, nil)
@@ -297,11 +298,59 @@ func TestBlockMapperMapPodDevice(t *testing.T) {
297298
nodeName := string(plug.host.GetNodeName())
298299

299300
csiMapper.csiClient = setupClient(t, true)
301+
fClient := csiMapper.csiClient.(*fakeCsiDriverClient)
302+
fClient.nodeClient.SetNextError(errors.New("mock final error"))
300303

301304
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
302305
attachment := makeTestAttachment(attachID, nodeName, pvName)
303306
attachment.Status.Attached = true
304-
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
307+
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{})
308+
if err != nil {
309+
t.Fatalf("failed to setup VolumeAttachment: %v", err)
310+
}
311+
t.Log("created attachement ", attachID)
312+
313+
err = csiMapper.SetUpDevice()
314+
if err == nil {
315+
t.Fatal("mapper unexpectedly succeeded")
316+
}
317+
318+
// Check that all directories have been cleaned
319+
// Check that all metadata / staging / publish directories were deleted
320+
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
321+
if _, err := os.Stat(dataDir); err == nil {
322+
t.Errorf("volume publish data directory %s was not deleted", dataDir)
323+
}
324+
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
325+
if _, err := os.Stat(devDir); err == nil {
326+
t.Errorf("volume publish device directory %s was not deleted", devDir)
327+
}
328+
stagingPath := csiMapper.getStagingPath()
329+
if _, err := os.Stat(stagingPath); err == nil {
330+
t.Errorf("volume staging path %s was not deleted", stagingPath)
331+
}
332+
}
333+
334+
func TestBlockMapperMapPodDevice(t *testing.T) {
335+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
336+
337+
plug, tmpDir := newTestPlugin(t, nil)
338+
defer os.RemoveAll(tmpDir)
339+
340+
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
341+
if err != nil {
342+
t.Fatalf("Failed to make a new Mapper: %v", err)
343+
}
344+
345+
pvName := pv.GetName()
346+
nodeName := string(plug.host.GetNodeName())
347+
348+
csiMapper.csiClient = setupClient(t, true)
349+
350+
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), nodeName)
351+
attachment := makeTestAttachment(attachID, nodeName, pvName)
352+
attachment.Status.Attached = true
353+
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{})
305354
if err != nil {
306355
t.Fatalf("failed to setup VolumeAttachment: %v", err)
307356
}
@@ -430,3 +479,123 @@ func TestBlockMapperTearDownDevice(t *testing.T) {
430479
t.Error("csi server may not have received NodeUnstageVolume call")
431480
}
432481
}
482+
483+
func TestVolumeSetupTeardown(t *testing.T) {
484+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
485+
486+
// Follow volume setup + teardown sequences at top of cs_block.go and set up / clean up one CSI block device.
487+
// Focus on testing that there were no leftover files present after the cleanup.
488+
489+
plug, tmpDir := newTestPlugin(t, nil)
490+
defer os.RemoveAll(tmpDir)
491+
492+
csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t)
493+
if err != nil {
494+
t.Fatalf("Failed to make a new Mapper: %v", err)
495+
}
496+
497+
pvName := pv.GetName()
498+
nodeName := string(plug.host.GetNodeName())
499+
500+
csiMapper.csiClient = setupClient(t, true)
501+
502+
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName))
503+
attachment := makeTestAttachment(attachID, nodeName, pvName)
504+
attachment.Status.Attached = true
505+
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
506+
if err != nil {
507+
t.Fatalf("failed to setup VolumeAttachment: %v", err)
508+
}
509+
t.Log("created attachement ", attachID)
510+
511+
err = csiMapper.SetUpDevice()
512+
if err != nil {
513+
t.Fatalf("mapper failed to SetupDevice: %v", err)
514+
}
515+
// Check if NodeStageVolume staged to the right path
516+
stagingPath := csiMapper.getStagingPath()
517+
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
518+
svol, ok := svols[csiMapper.volumeID]
519+
if !ok {
520+
t.Error("csi server may not have received NodeStageVolume call")
521+
}
522+
if svol.Path != stagingPath {
523+
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
524+
}
525+
526+
path, err := csiMapper.MapPodDevice()
527+
if err != nil {
528+
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
529+
}
530+
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
531+
pvol, ok := pvols[csiMapper.volumeID]
532+
if !ok {
533+
t.Error("csi server may not have received NodePublishVolume call")
534+
}
535+
publishPath := csiMapper.getPublishPath()
536+
if pvol.Path != publishPath {
537+
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
538+
}
539+
if path != publishPath {
540+
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
541+
}
542+
543+
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
544+
if err != nil {
545+
t.Fatalf("failed to make a new Unmapper: %v", err)
546+
}
547+
548+
csiUnmapper := unmapper.(*csiBlockMapper)
549+
csiUnmapper.csiClient = csiMapper.csiClient
550+
551+
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec)
552+
if err != nil {
553+
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err)
554+
}
555+
556+
err = csiUnmapper.UnmapPodDevice()
557+
if err != nil {
558+
t.Errorf("unmapper failed to call UnmapPodDevice: %v", err)
559+
}
560+
561+
// GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume
562+
unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "")
563+
if err != nil {
564+
t.Fatalf("failed to make a new Unmapper: %v", err)
565+
}
566+
csiUnmapper = unmapper.(*csiBlockMapper)
567+
csiUnmapper.csiClient = csiMapper.csiClient
568+
569+
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test")
570+
if err != nil {
571+
t.Fatal(err)
572+
}
573+
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
574+
if _, ok := pubs[csiUnmapper.volumeID]; ok {
575+
t.Error("csi server may not have received NodeUnpublishVolume call")
576+
}
577+
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
578+
if _, ok := vols[csiUnmapper.volumeID]; ok {
579+
t.Error("csi server may not have received NodeUnstageVolume call")
580+
}
581+
582+
// Check that all metadata / staging / publish directories were deleted
583+
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
584+
if _, err := os.Stat(dataDir); err == nil {
585+
t.Errorf("volume publish data directory %s was not deleted", dataDir)
586+
}
587+
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
588+
if _, err := os.Stat(devDir); err == nil {
589+
t.Errorf("volume publish device directory %s was not deleted", devDir)
590+
}
591+
if _, err := os.Stat(publishPath); err == nil {
592+
t.Errorf("volume publish path %s was not deleted", publishPath)
593+
}
594+
publishDir := filepath.Dir(publishPath)
595+
if _, err := os.Stat(publishDir); err == nil {
596+
t.Errorf("volume publish parent directory %s was not deleted", publishDir)
597+
}
598+
if _, err := os.Stat(stagingPath); err == nil {
599+
t.Errorf("volume staging path %s was not deleted", stagingPath)
600+
}
601+
}

0 commit comments

Comments
 (0)