Skip to content

Commit cb2684c

Browse files
authored
Merge pull request kubernetes#74026 from mkimuram/issue/73773
Separate staging/publish and unstaging/unpublish logics for block
2 parents 97d45fe + 4578c6c commit cb2684c

File tree

3 files changed

+197
-54
lines changed

3 files changed

+197
-54
lines changed

pkg/volume/csi/csi_block.go

Lines changed: 140 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,55 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17+
/*
18+
This file defines block volume related methods for CSI driver.
19+
CSI driver is responsible for staging/publishing volumes to their staging/publish paths.
20+
Mapping and unmapping of a device in a publish path to its global map path and its
21+
pod device map path are done by operation_executor through MapBlockVolume/UnmapBlockVolume
22+
(MapBlockVolume and UnmapBlockVolume take care for lock, symlink, and bind mount).
23+
24+
Summary of block volume related CSI driver's methods are as follows:
25+
- GetGlobalMapPath returns a global map path,
26+
- GetPodDeviceMapPath returns a pod device map path and filename,
27+
- SetUpDevice calls CSI's NodeStageVolume and stage a volume to its staging path,
28+
- MapPodDevice calls CSI's NodePublishVolume and publish a volume to its publish path,
29+
- UnmapPodDevice calls CSI's NodeUnpublishVolume and unpublish a volume from its publish path,
30+
- TearDownDevice calls CSI's NodeUnstageVolume and unstage a volume from its staging path.
31+
32+
These methods are called by below sequences:
33+
- operation_executor.MountVolume
34+
- csi.GetGlobalMapPath
35+
- csi.SetupDevice
36+
- NodeStageVolume
37+
- ASW.MarkDeviceAsMounted
38+
- csi.GetPodDeviceMapPath
39+
- csi.MapPodDevice
40+
- NodePublishVolume
41+
- util.MapBlockVolume
42+
- ASW.MarkVolumeAsMounted
43+
44+
- operation_executor.UnmountVolume
45+
- csi.GetPodDeviceMapPath
46+
- util.UnmapBlockVolume
47+
- csi.UnmapPodDevice
48+
- NodeUnpublishVolume
49+
- ASW.MarkVolumeAsUnmounted
50+
51+
- operation_executor.UnmountDevice
52+
- csi.TearDownDevice
53+
- NodeUnstageVolume
54+
- ASW.MarkDeviceAsUnmounted
55+
56+
After successful MountVolume for block volume, directory structure will be like below:
57+
/dev/loopX ... Descriptor lock(Loopback device to mapFile under global map path)
58+
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/ ... Global map path
59+
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/{podUID} ... MapFile(Bind mount to publish Path)
60+
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/staging/{specName} ... Staging path
61+
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} ... Publish path
62+
/var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/ ... Pod device map path
63+
/var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/{specName} ... MapFile(Symlink to publish path)
64+
*/
65+
1766
package csi
1867

1968
import (
@@ -51,34 +100,31 @@ var _ volume.BlockVolumeMapper = &csiBlockMapper{}
51100
var _ volume.CustomBlockVolumeMapper = &csiBlockMapper{}
52101

53102
// GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to
54-
// Example: plugins/kubernetes.io/csi/volumeDevices/{pvname}/dev
103+
// Example: plugins/kubernetes.io/csi/volumeDevices/{specName}/dev
55104
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
56-
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
105+
dir := getVolumeDevicePluginDir(m.specName, m.plugin.host)
57106
klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
58107
return dir, nil
59108
}
60109

61110
// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
62-
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
111+
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{specName}
63112
func (m *csiBlockMapper) getStagingPath() string {
64-
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
65-
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", sanitizedSpecVolID)
113+
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
66114
}
67115

68116
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
69-
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
117+
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID}
70118
func (m *csiBlockMapper) getPublishPath() string {
71-
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
72-
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", sanitizedSpecVolID)
119+
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID))
73120
}
74121

75122
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
76-
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
123+
// returns: pods/{podUID}/volumeDevices/kubernetes.io~csi, {specName}
77124
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
78125
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
79-
specName := m.specName
80-
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
81-
return path, specName
126+
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, m.specName))
127+
return path, m.specName
82128
}
83129

84130
// stageVolumeForBlock stages a block volume to stagingPath
@@ -150,7 +196,6 @@ func (m *csiBlockMapper) publishVolumeForBlock(
150196
accessMode v1.PersistentVolumeAccessMode,
151197
csiSource *v1.CSIPersistentVolumeSource,
152198
attachment *storage.VolumeAttachment,
153-
stagingPath string,
154199
) (string, error) {
155200
klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
156201

@@ -186,7 +231,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
186231
ctx,
187232
m.volumeID,
188233
m.readOnly,
189-
stagingPath,
234+
m.getStagingPath(),
190235
publishPath,
191236
accessMode,
192237
publishVolumeInfo,
@@ -252,13 +297,7 @@ func (m *csiBlockMapper) SetUpDevice() error {
252297
}
253298

254299
// Call NodeStageVolume
255-
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
256-
if err != nil {
257-
return err
258-
}
259-
260-
// Call NodePublishVolume
261-
_, err = m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
300+
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
262301
if err != nil {
263302
return err
264303
}
@@ -267,7 +306,59 @@ func (m *csiBlockMapper) SetUpDevice() error {
267306
}
268307

269308
func (m *csiBlockMapper) MapPodDevice() (string, error) {
270-
return m.getPublishPath(), nil
309+
if !m.plugin.blockEnabled {
310+
return "", errors.New("CSIBlockVolume feature not enabled")
311+
}
312+
klog.V(4).Infof(log("blockMapper.MapPodDevice called"))
313+
314+
// Get csiSource from spec
315+
if m.spec == nil {
316+
return "", errors.New(log("blockMapper.MapPodDevice spec is nil"))
317+
}
318+
319+
csiSource, err := getCSISourceFromSpec(m.spec)
320+
if err != nil {
321+
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI persistent source: %v", err))
322+
}
323+
324+
driverName := csiSource.Driver
325+
skip, err := m.plugin.skipAttach(driverName)
326+
if err != nil {
327+
return "", errors.New(log("blockMapper.MapPodDevice failed to check CSIDriver for %s: %v", driverName, err))
328+
}
329+
330+
var attachment *storage.VolumeAttachment
331+
if !skip {
332+
// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
333+
nodeName := string(m.plugin.host.GetNodeName())
334+
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
335+
attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
336+
if err != nil {
337+
return "", errors.New(log("blockMapper.MapPodDevice failed to get volume attachment [id=%v]: %v", attachID, err))
338+
}
339+
}
340+
341+
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
342+
accessMode := v1.ReadWriteOnce
343+
if m.spec.PersistentVolume.Spec.AccessModes != nil {
344+
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
345+
}
346+
347+
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
348+
defer cancel()
349+
350+
csiClient, err := m.csiClientGetter.Get()
351+
if err != nil {
352+
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
353+
}
354+
355+
// Call NodePublishVolume
356+
publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
357+
if err != nil {
358+
return "", err
359+
}
360+
361+
return publishPath, nil
271362
}
272363

273364
var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
@@ -322,8 +413,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
322413
return errors.New("CSIBlockVolume feature not enabled")
323414
}
324415

325-
klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))
326-
327416
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
328417
defer cancel()
329418

@@ -332,21 +421,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
332421
return errors.New(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
333422
}
334423

335-
// Call NodeUnpublishVolume
336-
publishPath := m.getPublishPath()
337-
if _, err := os.Stat(publishPath); err != nil {
338-
if os.IsNotExist(err) {
339-
klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
340-
} else {
341-
return err
342-
}
343-
} else {
344-
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
345-
if err != nil {
346-
return err
347-
}
348-
}
349-
350424
// Call NodeUnstageVolume
351425
stagingPath := m.getStagingPath()
352426
if _, err := os.Stat(stagingPath); err != nil {
@@ -367,5 +441,32 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
367441

368442
// UnmapPodDevice unmaps the block device path.
369443
func (m *csiBlockMapper) UnmapPodDevice() error {
444+
if !m.plugin.blockEnabled {
445+
return errors.New("CSIBlockVolume feature not enabled")
446+
}
447+
publishPath := m.getPublishPath()
448+
449+
csiClient, err := m.csiClientGetter.Get()
450+
if err != nil {
451+
return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
452+
}
453+
454+
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
455+
defer cancel()
456+
457+
// Call NodeUnpublishVolume
458+
if _, err := os.Stat(publishPath); err != nil {
459+
if os.IsNotExist(err) {
460+
klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
461+
} else {
462+
return err
463+
}
464+
} else {
465+
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
466+
if err != nil {
467+
return err
468+
}
469+
}
470+
370471
return nil
371472
}

pkg/volume/csi/csi_block_test.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,40 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T
4949
return csiMapper, spec, pv, nil
5050
}
5151

52+
func prepareBlockUnmapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
53+
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
54+
pv := makeTestPV(specVolumeName, 10, testDriver, testVol)
55+
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)
56+
57+
// save volume data
58+
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
59+
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
60+
t.Errorf("failed to create dir [%s]: %v", dir, err)
61+
}
62+
63+
if err := saveVolumeData(
64+
dir,
65+
volDataFileName,
66+
map[string]string{
67+
volDataKey.specVolID: pv.ObjectMeta.Name,
68+
volDataKey.driverName: testDriver,
69+
volDataKey.volHandle: testVol,
70+
},
71+
); err != nil {
72+
t.Fatalf("failed to save volume data: %v", err)
73+
}
74+
75+
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
76+
if err != nil {
77+
t.Fatalf("failed to make a new Unmapper: %v", err)
78+
}
79+
80+
csiUnmapper := unmapper.(*csiBlockMapper)
81+
csiUnmapper.csiClient = setupClient(t, true)
82+
83+
return csiUnmapper, spec, pv, nil
84+
}
85+
5286
func TestBlockMapperGetGlobalMapPath(t *testing.T) {
5387
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()
5488

@@ -141,12 +175,12 @@ func TestBlockMapperGetPublishPath(t *testing.T) {
141175
{
142176
name: "simple specName",
143177
specVolumeName: "spec-0",
144-
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "spec-0")),
178+
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s/%s", "spec-0", testPodUID)),
145179
},
146180
{
147181
name: "specName with dots",
148182
specVolumeName: "test.spec.1",
149-
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "test.spec.1")),
183+
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s/%s", "test.spec.1", testPodUID)),
150184
},
151185
}
152186
for _, tc := range testCases {
@@ -254,18 +288,6 @@ func TestBlockMapperSetupDevice(t *testing.T) {
254288
if svol.Path != stagingPath {
255289
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
256290
}
257-
258-
// Check if NodePublishVolume published to the right path
259-
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
260-
pvol, ok := pvols[csiMapper.volumeID]
261-
if !ok {
262-
t.Error("csi server may not have received NodePublishVolume call")
263-
}
264-
265-
publishPath := csiMapper.getPublishPath()
266-
if pvol.Path != publishPath {
267-
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
268-
}
269291
}
270292

271293
func TestBlockMapperMapPodDevice(t *testing.T) {
@@ -307,9 +329,20 @@ func TestBlockMapperMapPodDevice(t *testing.T) {
307329
if err != nil {
308330
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
309331
}
332+
333+
// Check if NodePublishVolume published to the right path
334+
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
335+
pvol, ok := pvols[csiMapper.volumeID]
336+
if !ok {
337+
t.Error("csi server may not have received NodePublishVolume call")
338+
}
339+
310340
publishPath := csiMapper.getPublishPath()
341+
if pvol.Path != publishPath {
342+
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
343+
}
311344
if path != publishPath {
312-
t.Errorf("path %s and %s doesn't match", path, publishPath)
345+
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
313346
}
314347
}
315348

pkg/volume/csi/fake/fake_client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,20 @@ func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
120120
return f.nodePublishedVolumes
121121
}
122122

123+
// AddNodePublishedVolume adds specified volume to nodePublishedVolumes
124+
func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
125+
f.nodePublishedVolumes[volID] = CSIVolume{
126+
Path: deviceMountPath,
127+
VolumeContext: volumeContext,
128+
}
129+
}
130+
123131
// GetNodeStagedVolumes returns node staged volumes
124132
func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
125133
return f.nodeStagedVolumes
126134
}
127135

136+
// AddNodeStagedVolume adds specified volume to nodeStagedVolumes
128137
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
129138
f.nodeStagedVolumes[volID] = CSIVolume{
130139
Path: deviceMountPath,

0 commit comments

Comments
 (0)