Skip to content

Commit e7b9160

Browse files
Copilotandyzhangx
andcommitted
Implement NodeStageVolume to properly propagate mount options
- Add STAGE_UNSTAGE_VOLUME capability to driver - Implement NodeStageVolume to mount NFS share at staging path with all mount options - Modify NodePublishVolume to bind mount from staging to target and remount with security options - Implement NodeUnstageVolume to properly unmount staged volumes - Add comprehensive tests for staging and unstaging functionality - Maintain backward compatibility with legacy direct mount path Co-authored-by: andyzhangx <[email protected]>
1 parent 1088194 commit e7b9160

File tree

3 files changed

+333
-25
lines changed

3 files changed

+333
-25
lines changed

pkg/nfs/nfs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func NewDriver(options *DriverOptions) *Driver {
113113
n.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
114114
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
115115
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
116+
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
116117
csi.NodeServiceCapability_RPC_UNKNOWN,
117118
})
118119
n.volumeLocks = NewVolumeLocks()

pkg/nfs/nodeserver.go

Lines changed: 176 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
5555
if len(targetPath) == 0 {
5656
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
5757
}
58+
stagingTargetPath := req.GetStagingTargetPath()
5859

5960
lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
6061
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
@@ -105,15 +106,6 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
105106
if baseDir == "" {
106107
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare))
107108
}
108-
server = getServerFromSource(server)
109-
source := fmt.Sprintf("%s:%s", server, baseDir)
110-
if subDir != "" {
111-
// replace pv/pvc name namespace metadata in subDir
112-
subDir = replaceWithMap(subDir, subDirReplaceMap)
113-
114-
source = strings.TrimRight(source, "/")
115-
source = fmt.Sprintf("%s/%s", source, subDir)
116-
}
117109

118110
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
119111
if err != nil {
@@ -130,19 +122,67 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
130122
return &csi.NodePublishVolumeResponse{}, nil
131123
}
132124

133-
klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions)
134-
execFunc := func() error {
135-
return ns.mounter.Mount(source, targetPath, "nfs", mountOptions)
136-
}
137-
timeoutFunc := func() error { return fmt.Errorf("time out") }
138-
if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
139-
if os.IsPermission(err) {
140-
return nil, status.Error(codes.PermissionDenied, err.Error())
125+
// If stagingTargetPath is provided, bind mount from staging to target
126+
// and remount with security options to ensure they are applied
127+
if stagingTargetPath != "" {
128+
klog.V(2).Infof("NodePublishVolume: volumeID(%v) bind mounting from stagingPath(%s) to targetPath(%s) with mountflags(%v)", volumeID, stagingTargetPath, targetPath, mountOptions)
129+
130+
// Perform bind mount
131+
if err := ns.mounter.Mount(stagingTargetPath, targetPath, "", []string{"bind"}); err != nil {
132+
if os.IsPermission(err) {
133+
return nil, status.Error(codes.PermissionDenied, err.Error())
134+
}
135+
return nil, status.Error(codes.Internal, err.Error())
141136
}
142-
if strings.Contains(err.Error(), "invalid argument") {
143-
return nil, status.Error(codes.InvalidArgument, err.Error())
137+
138+
// Remount with security options to ensure they are applied to the bind mount
139+
// Extract security-related mount options that need to be re-applied
140+
securityOpts := []string{"remount"}
141+
for _, opt := range mountOptions {
142+
// Include security options and readonly flag
143+
if opt == "noexec" || opt == "nosuid" || opt == "nodev" || opt == "ro" {
144+
securityOpts = append(securityOpts, opt)
145+
}
146+
}
147+
148+
// Only remount if there are security options to apply
149+
if len(securityOpts) > 1 {
150+
klog.V(2).Infof("NodePublishVolume: remounting targetPath(%s) with security options(%v)", targetPath, securityOpts)
151+
if err := ns.mounter.Mount("", targetPath, "", securityOpts); err != nil {
152+
// Attempt to cleanup the bind mount on failure
153+
mount.CleanupMountPoint(targetPath, ns.mounter, false)
154+
if os.IsPermission(err) {
155+
return nil, status.Error(codes.PermissionDenied, err.Error())
156+
}
157+
return nil, status.Error(codes.Internal, err.Error())
158+
}
159+
}
160+
} else {
161+
// Legacy path: direct NFS mount (for backward compatibility when staging is not used)
162+
server = getServerFromSource(server)
163+
source := fmt.Sprintf("%s:%s", server, baseDir)
164+
if subDir != "" {
165+
// replace pv/pvc name namespace metadata in subDir
166+
subDir = replaceWithMap(subDir, subDirReplaceMap)
167+
168+
source = strings.TrimRight(source, "/")
169+
source = fmt.Sprintf("%s/%s", source, subDir)
170+
}
171+
172+
klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions)
173+
execFunc := func() error {
174+
return ns.mounter.Mount(source, targetPath, "nfs", mountOptions)
175+
}
176+
timeoutFunc := func() error { return fmt.Errorf("time out") }
177+
if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
178+
if os.IsPermission(err) {
179+
return nil, status.Error(codes.PermissionDenied, err.Error())
180+
}
181+
if strings.Contains(err.Error(), "invalid argument") {
182+
return nil, status.Error(codes.InvalidArgument, err.Error())
183+
}
184+
return nil, status.Error(codes.Internal, err.Error())
144185
}
145-
return nil, status.Error(codes.Internal, err.Error())
146186
}
147187

148188
if mountPermissions > 0 {
@@ -152,7 +192,7 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
152192
} else {
153193
klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
154194
}
155-
klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
195+
klog.V(2).Infof("volume(%s) mounted to %s successfully", volumeID, targetPath)
156196
return &csi.NodePublishVolumeResponse{}, nil
157197
}
158198

@@ -286,13 +326,124 @@ func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolu
286326
}
287327

288328
// NodeUnstageVolume unstage volume
289-
func (ns *NodeServer) NodeUnstageVolume(_ context.Context, _ *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
290-
return nil, status.Error(codes.Unimplemented, "")
329+
func (ns *NodeServer) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
330+
volumeID := req.GetVolumeId()
331+
if len(volumeID) == 0 {
332+
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
333+
}
334+
stagingTargetPath := req.GetStagingTargetPath()
335+
if len(stagingTargetPath) == 0 {
336+
return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
337+
}
338+
339+
lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
340+
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
341+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
342+
}
343+
defer ns.Driver.volumeLocks.Release(lockKey)
344+
345+
klog.V(2).Infof("NodeUnstageVolume: unmounting volume %s on %s", volumeID, stagingTargetPath)
346+
var err error
347+
extensiveMountPointCheck := true
348+
forceUnmounter, ok := ns.mounter.(mount.MounterForceUnmounter)
349+
if ok {
350+
klog.V(2).Infof("force unmount %s on %s", volumeID, stagingTargetPath)
351+
err = mount.CleanupMountWithForce(stagingTargetPath, forceUnmounter, extensiveMountPointCheck, 30*time.Second)
352+
} else {
353+
err = mount.CleanupMountPoint(stagingTargetPath, ns.mounter, extensiveMountPointCheck)
354+
}
355+
if err != nil {
356+
return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
357+
}
358+
klog.V(2).Infof("NodeUnstageVolume: unmount volume %s on %s successfully", volumeID, stagingTargetPath)
359+
360+
return &csi.NodeUnstageVolumeResponse{}, nil
291361
}
292362

293363
// NodeStageVolume stage volume
294-
func (ns *NodeServer) NodeStageVolume(_ context.Context, _ *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
295-
return nil, status.Error(codes.Unimplemented, "")
364+
func (ns *NodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
365+
volCap := req.GetVolumeCapability()
366+
if volCap == nil {
367+
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
368+
}
369+
volumeID := req.GetVolumeId()
370+
if len(volumeID) == 0 {
371+
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
372+
}
373+
stagingTargetPath := req.GetStagingTargetPath()
374+
if len(stagingTargetPath) == 0 {
375+
return nil, status.Error(codes.InvalidArgument, "Staging target path not provided")
376+
}
377+
378+
lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
379+
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
380+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
381+
}
382+
defer ns.Driver.volumeLocks.Release(lockKey)
383+
384+
mountOptions := volCap.GetMount().GetMountFlags()
385+
386+
var server, baseDir, subDir string
387+
for k, v := range req.GetVolumeContext() {
388+
switch strings.ToLower(k) {
389+
case paramServer:
390+
server = v
391+
case paramShare:
392+
baseDir = v
393+
case paramSubDir:
394+
subDir = v
395+
case mountOptionsField:
396+
if v != "" {
397+
mountOptions = append(mountOptions, v)
398+
}
399+
}
400+
}
401+
402+
if server == "" {
403+
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramServer))
404+
}
405+
if baseDir == "" {
406+
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare))
407+
}
408+
server = getServerFromSource(server)
409+
source := fmt.Sprintf("%s:%s", server, baseDir)
410+
if subDir != "" {
411+
source = strings.TrimRight(source, "/")
412+
source = fmt.Sprintf("%s/%s", source, subDir)
413+
}
414+
415+
notMnt, err := ns.mounter.IsLikelyNotMountPoint(stagingTargetPath)
416+
if err != nil {
417+
if os.IsNotExist(err) {
418+
if err := os.MkdirAll(stagingTargetPath, os.FileMode(0755)); err != nil {
419+
return nil, status.Error(codes.Internal, err.Error())
420+
}
421+
notMnt = true
422+
} else {
423+
return nil, status.Error(codes.Internal, err.Error())
424+
}
425+
}
426+
if !notMnt {
427+
return &csi.NodeStageVolumeResponse{}, nil
428+
}
429+
430+
klog.V(2).Infof("NodeStageVolume: volumeID(%v) source(%s) stagingTargetPath(%s) mountflags(%v)", volumeID, source, stagingTargetPath, mountOptions)
431+
execFunc := func() error {
432+
return ns.mounter.Mount(source, stagingTargetPath, "nfs", mountOptions)
433+
}
434+
timeoutFunc := func() error { return fmt.Errorf("time out") }
435+
if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
436+
if os.IsPermission(err) {
437+
return nil, status.Error(codes.PermissionDenied, err.Error())
438+
}
439+
if strings.Contains(err.Error(), "invalid argument") {
440+
return nil, status.Error(codes.InvalidArgument, err.Error())
441+
}
442+
return nil, status.Error(codes.Internal, err.Error())
443+
}
444+
445+
klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, stagingTargetPath)
446+
return &csi.NodeStageVolumeResponse{}, nil
296447
}
297448

298449
// NodeExpandVolume node expand volume

0 commit comments

Comments
 (0)