Skip to content

Commit 47a9952

Browse files
authored
Merge pull request kubernetes#86968 from gnufied/add-extra-csi-fields
Add extra fields in node expansion CSI call
2 parents 6cedc08 + 6342dad commit 47a9952

File tree

13 files changed

+262
-109
lines changed

13 files changed

+262
-109
lines changed

pkg/volume/csi/csi_block.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
107107
return dir, nil
108108
}
109109

110-
// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
110+
// GetStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
111111
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{specName}
112-
func (m *csiBlockMapper) getStagingPath() string {
112+
func (m *csiBlockMapper) GetStagingPath() string {
113113
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
114114
}
115115

@@ -143,7 +143,7 @@ func (m *csiBlockMapper) stageVolumeForBlock(
143143
) (string, error) {
144144
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
145145

146-
stagingPath := m.getStagingPath()
146+
stagingPath := m.GetStagingPath()
147147
klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
148148

149149
// Check whether "STAGE_UNSTAGE_VOLUME" is set
@@ -237,7 +237,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
237237
ctx,
238238
m.volumeID,
239239
m.readOnly,
240-
m.getStagingPath(),
240+
m.GetStagingPath(),
241241
publishPath,
242242
accessMode,
243243
publishVolumeInfo,
@@ -255,26 +255,26 @@ func (m *csiBlockMapper) publishVolumeForBlock(
255255
}
256256

257257
// SetUpDevice ensures the device is attached returns path where the device is located.
258-
func (m *csiBlockMapper) SetUpDevice() error {
258+
func (m *csiBlockMapper) SetUpDevice() (string, error) {
259259
if !m.plugin.blockEnabled {
260-
return errors.New("CSIBlockVolume feature not enabled")
260+
return "", errors.New("CSIBlockVolume feature not enabled")
261261
}
262262
klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
263263

264264
// Get csiSource from spec
265265
if m.spec == nil {
266-
return errors.New(log("blockMapper.SetUpDevice spec is nil"))
266+
return "", errors.New(log("blockMapper.SetUpDevice spec is nil"))
267267
}
268268

269269
csiSource, err := getCSISourceFromSpec(m.spec)
270270
if err != nil {
271-
return errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
271+
return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
272272
}
273273

274274
driverName := csiSource.Driver
275275
skip, err := m.plugin.skipAttach(driverName)
276276
if err != nil {
277-
return errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
277+
return "", errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
278278
}
279279

280280
var attachment *storage.VolumeAttachment
@@ -284,7 +284,7 @@ func (m *csiBlockMapper) SetUpDevice() error {
284284
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
285285
attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
286286
if err != nil {
287-
return errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
287+
return "", errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
288288
}
289289
}
290290

@@ -299,11 +299,11 @@ func (m *csiBlockMapper) SetUpDevice() error {
299299

300300
csiClient, err := m.csiClientGetter.Get()
301301
if err != nil {
302-
return errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
302+
return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
303303
}
304304

305305
// Call NodeStageVolume
306-
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
306+
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
307307
if err != nil {
308308
if volumetypes.IsOperationFinishedError(err) {
309309
cleanupErr := m.cleanupOrphanDeviceFiles()
@@ -312,10 +312,10 @@ func (m *csiBlockMapper) SetUpDevice() error {
312312
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
313313
}
314314
}
315-
return err
315+
return "", err
316316
}
317317

318-
return nil
318+
return stagingPath, nil
319319
}
320320

321321
func (m *csiBlockMapper) MapPodDevice() (string, error) {
@@ -435,7 +435,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
435435
}
436436

437437
// Call NodeUnstageVolume
438-
stagingPath := m.getStagingPath()
438+
stagingPath := m.GetStagingPath()
439439
if _, err := os.Stat(stagingPath); err != nil {
440440
if os.IsNotExist(err) {
441441
klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
@@ -471,7 +471,7 @@ func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
471471

472472
// Remove artifacts of NodeStage.
473473
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
474-
stagingPath := m.getStagingPath()
474+
stagingPath := m.GetStagingPath()
475475
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
476476
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
477477
}

pkg/volume/csi/csi_block_test.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func TestBlockMapperGetStagingPath(t *testing.T) {
123123
t.Fatalf("Failed to make a new Mapper: %v", err)
124124
}
125125

126-
path := csiMapper.getStagingPath()
126+
path := csiMapper.GetStagingPath()
127127

128128
if tc.path != path {
129129
t.Errorf("expecting path %s, got %s", tc.path, path)
@@ -234,13 +234,12 @@ func TestBlockMapperSetupDevice(t *testing.T) {
234234
}
235235
t.Log("created attachement ", attachID)
236236

237-
err = csiMapper.SetUpDevice()
237+
stagingPath, err := csiMapper.SetUpDevice()
238238
if err != nil {
239239
t.Fatalf("mapper failed to SetupDevice: %v", err)
240240
}
241241

242242
// Check if NodeStageVolume staged to the right path
243-
stagingPath := csiMapper.getStagingPath()
244243
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
245244
svol, ok := svols[csiMapper.volumeID]
246245
if !ok {
@@ -278,7 +277,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) {
278277
}
279278
t.Log("created attachement ", attachID)
280279

281-
err = csiMapper.SetUpDevice()
280+
stagingPath, err := csiMapper.SetUpDevice()
282281
if err == nil {
283282
t.Fatal("mapper unexpectedly succeeded")
284283
}
@@ -293,7 +292,7 @@ func TestBlockMapperSetupDeviceError(t *testing.T) {
293292
if _, err := os.Stat(devDir); err == nil {
294293
t.Errorf("volume publish device directory %s was not deleted", devDir)
295294
}
296-
stagingPath := csiMapper.getStagingPath()
295+
297296
if _, err := os.Stat(stagingPath); err == nil {
298297
t.Errorf("volume staging path %s was not deleted", stagingPath)
299298
}
@@ -475,12 +474,11 @@ func TestVolumeSetupTeardown(t *testing.T) {
475474
}
476475
t.Log("created attachement ", attachID)
477476

478-
err = csiMapper.SetUpDevice()
477+
stagingPath, err := csiMapper.SetUpDevice()
479478
if err != nil {
480479
t.Fatalf("mapper failed to SetupDevice: %v", err)
481480
}
482481
// Check if NodeStageVolume staged to the right path
483-
stagingPath := csiMapper.getStagingPath()
484482
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes()
485483
svol, ok := svols[csiMapper.volumeID]
486484
if !ok {

pkg/volume/csi/csi_client.go

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type csiClient interface {
5454
fsType string,
5555
mountOptions []string,
5656
) error
57-
NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error)
57+
NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error)
5858
NodeUnpublishVolume(
5959
ctx context.Context,
6060
volID string,
@@ -95,6 +95,20 @@ type csiDriverClient struct {
9595
nodeV1ClientCreator nodeV1ClientCreator
9696
}
9797

98+
type csiResizeOptions struct {
99+
volumeID string
100+
// volumePath is path where volume is available. It could be:
101+
// - path where node is staged if NodeExpandVolume is called after NodeStageVolume
102+
// - path where volume is published if NodeExpandVolume is called after NodePublishVolume
103+
// DEPRECATION NOTICE: in future NodeExpandVolume will be always called after NodePublish
104+
volumePath string
105+
stagingTargetPath string
106+
fsType string
107+
accessMode api.PersistentVolumeAccessMode
108+
newSize resource.Quantity
109+
mountOptions []string
110+
}
111+
98112
var _ csiClient = &csiDriverClient{}
99113

100114
type nodeV1ClientCreator func(addr csiAddr) (
@@ -245,36 +259,61 @@ func (c *csiDriverClient) NodePublishVolume(
245259
return err
246260
}
247261

248-
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
262+
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
249263
if c.nodeV1ClientCreator == nil {
250-
return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
264+
return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
251265
}
252266

253-
if volumeID == "" {
254-
return newSize, errors.New("missing volume id")
267+
if opts.volumeID == "" {
268+
return opts.newSize, errors.New("missing volume id")
255269
}
256-
if volumePath == "" {
257-
return newSize, errors.New("missing volume path")
270+
if opts.volumePath == "" {
271+
return opts.newSize, errors.New("missing volume path")
258272
}
259273

260-
if newSize.Value() < 0 {
261-
return newSize, errors.New("size can not be less than 0")
274+
if opts.newSize.Value() < 0 {
275+
return opts.newSize, errors.New("size can not be less than 0")
262276
}
263277

264278
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
265279
if err != nil {
266-
return newSize, err
280+
return opts.newSize, err
267281
}
268282
defer closer.Close()
269283

270284
req := &csipbv1.NodeExpandVolumeRequest{
271-
VolumeId: volumeID,
272-
VolumePath: volumePath,
273-
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
285+
VolumeId: opts.volumeID,
286+
VolumePath: opts.volumePath,
287+
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
288+
VolumeCapability: &csipbv1.VolumeCapability{
289+
AccessMode: &csipbv1.VolumeCapability_AccessMode{
290+
Mode: asCSIAccessModeV1(opts.accessMode),
291+
},
292+
},
293+
}
294+
295+
// not all CSI drivers support NodeStageUnstage and hence the StagingTargetPath
296+
// should only be set when available
297+
if opts.stagingTargetPath != "" {
298+
req.StagingTargetPath = opts.stagingTargetPath
274299
}
300+
301+
if opts.fsType == fsTypeBlockName {
302+
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
303+
Block: &csipbv1.VolumeCapability_BlockVolume{},
304+
}
305+
} else {
306+
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
307+
Mount: &csipbv1.VolumeCapability_MountVolume{
308+
FsType: opts.fsType,
309+
MountFlags: opts.mountOptions,
310+
},
311+
}
312+
}
313+
275314
resp, err := nodeClient.NodeExpandVolume(ctx, req)
276315
if err != nil {
277-
return newSize, err
316+
return opts.newSize, err
278317
}
279318
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
280319
return *updatedQuantity, nil

pkg/volume/csi/csi_client_test.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -284,16 +284,34 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo
284284
return stageUnstageSet, nil
285285
}
286286

287-
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
287+
func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
288288
c.t.Log("calling fake.NodeExpandVolume")
289289
req := &csipbv1.NodeExpandVolumeRequest{
290-
VolumeId: volumeid,
291-
VolumePath: volumePath,
292-
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
290+
VolumeId: opts.volumeID,
291+
VolumePath: opts.volumePath,
292+
StagingTargetPath: opts.stagingTargetPath,
293+
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
294+
VolumeCapability: &csipbv1.VolumeCapability{
295+
AccessMode: &csipbv1.VolumeCapability_AccessMode{
296+
Mode: asCSIAccessModeV1(opts.accessMode),
297+
},
298+
},
299+
}
300+
if opts.fsType == fsTypeBlockName {
301+
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
302+
Block: &csipbv1.VolumeCapability_BlockVolume{},
303+
}
304+
} else {
305+
req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
306+
Mount: &csipbv1.VolumeCapability_MountVolume{
307+
FsType: opts.fsType,
308+
MountFlags: opts.mountOptions,
309+
},
310+
}
293311
}
294312
resp, err := c.nodeClient.NodeExpandVolume(ctx, req)
295313
if err != nil {
296-
return newSize, err
314+
return opts.newSize, err
297315
}
298316
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
299317
return *updatedQuantity, nil
@@ -635,7 +653,8 @@ func TestNodeExpandVolume(t *testing.T) {
635653
return nodeClient, fakeCloser, nil
636654
},
637655
}
638-
_, err := client.NodeExpandVolume(context.Background(), tc.volID, tc.volumePath, tc.newSize)
656+
opts := csiResizeOptions{volumeID: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize}
657+
_, err := client.NodeExpandVolume(context.Background(), opts)
639658
checkErr(t, tc.mustFail, err)
640659
if !tc.mustFail {
641660
fakeCloser.Check()

pkg/volume/csi/expander.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,34 @@ func (c *csiPlugin) nodeExpandWithClient(
9494
return false, nil
9595
}
9696

97-
volumeTargetPath := resizeOptions.DeviceMountPath
97+
pv := resizeOptions.VolumeSpec.PersistentVolume
98+
if pv == nil {
99+
return false, fmt.Errorf("Expander.NodeExpand failed to find associated PersistentVolume for plugin %s", c.GetPluginName())
100+
}
101+
102+
opts := csiResizeOptions{
103+
volumePath: resizeOptions.DeviceMountPath,
104+
stagingTargetPath: resizeOptions.DeviceStagePath,
105+
volumeID: csiSource.VolumeHandle,
106+
newSize: resizeOptions.NewSize,
107+
fsType: csiSource.FSType,
108+
accessMode: api.ReadWriteOnce,
109+
mountOptions: pv.Spec.MountOptions,
110+
}
111+
98112
if !fsVolume {
99-
volumeTargetPath = resizeOptions.DevicePath
113+
// for block volumes the volumePath in CSI NodeExpandvolumeRequest is
114+
// basically same as DevicePath because block devices are not mounted and hence
115+
// DeviceMountPath does not get populated in resizeOptions.DeviceMountPath
116+
opts.volumePath = resizeOptions.DevicePath
117+
opts.fsType = fsTypeBlockName
118+
}
119+
120+
if pv.Spec.AccessModes != nil {
121+
opts.accessMode = pv.Spec.AccessModes[0]
100122
}
101123

102-
_, err = csClient.NodeExpandVolume(ctx, csiSource.VolumeHandle, volumeTargetPath, resizeOptions.NewSize)
124+
_, err = csClient.NodeExpandVolume(ctx, opts)
103125
if err != nil {
104126
return false, fmt.Errorf("Expander.NodeExpand failed to expand the volume : %v", err)
105127
}

0 commit comments

Comments
 (0)