Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions cmd/csi-rclone-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,10 @@ func main() {
Use: "run",
Short: "Start the CSI driver.",
}
root.AddCommand(runCmd)
nodeCommandLineParameters(runCmd)
controllerCommandLineParameters(runCmd)

runNode := &cobra.Command{
Use: "node",
Short: "Start the CSI driver node service - expected to run in a daemonset on every node.",
Run: func(cmd *cobra.Command, args []string) {
handleNode()
},
}
runNode.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
runNode.MarkPersistentFlagRequired("nodeid")
runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runNode.MarkPersistentFlagRequired("endpoint")
runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir")
runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size")
runCmd.AddCommand(runNode)
runController := &cobra.Command{
Use: "controller",
Short: "Start the CSI driver controller.",
Run: func(cmd *cobra.Command, args []string) {
handleController()
},
}
runController.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
runController.MarkPersistentFlagRequired("nodeid")
runController.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runController.MarkPersistentFlagRequired("endpoint")
runCmd.AddCommand(runController)
root.AddCommand(runCmd)

versionCmd := &cobra.Command{
Use: "version",
Expand Down Expand Up @@ -123,6 +99,23 @@ func handleNode() {
}
}

func nodeCommandLineParameters(runCmd *cobra.Command) {
runNode := &cobra.Command{
Use: "node",
Short: "Start the CSI driver node service - expected to run in a daemonset on every node.",
Run: func(cmd *cobra.Command, args []string) {
handleNode()
},
}
runNode.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
runNode.MarkPersistentFlagRequired("nodeid")
runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runNode.MarkPersistentFlagRequired("endpoint")
runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir")
runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size")
runCmd.AddCommand(runNode)
}

func handleController() {
d := rclone.NewDriver(nodeID, endpoint)
cs := rclone.NewControllerServer(d.CSIDriver)
Expand All @@ -134,6 +127,21 @@ func handleController() {
}
}

func controllerCommandLineParameters(runCmd *cobra.Command) {
runController := &cobra.Command{
Use: "controller",
Short: "Start the CSI driver controller.",
Run: func(cmd *cobra.Command, args []string) {
handleController()
},
}
runController.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id")
runController.MarkPersistentFlagRequired("nodeid")
runController.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runController.MarkPersistentFlagRequired("endpoint")
runCmd.AddCommand(runController)
}

// unmountOldVols is used to unmount volumes after a restart on a node
func unmountOldVols() error {
const mountType = "fuse.rclone"
Expand Down
28 changes: 24 additions & 4 deletions pkg/rclone/driver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rclone

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -78,14 +79,27 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st
}
rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize)

return &nodeServer{
// Use kubelet plugin directory for state persistence
stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal, but it does not hurt to make this a constant

Suggested change
stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json"
const stateFile string = "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will find this (and the comment) moved directly into the instantiation of nodeServer just below as this is the only place where this variable is used in a commit in #78.


ns := &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver),
mounter: &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
},
RcloneOps: rcloneOps,
}, nil
RcloneOps: rcloneOps,
mountedVolumes: make(map[string]*MountedVolume),
mutex: &sync.Mutex{},
stateFile: stateFile,
}

// Load persisted state on startup
if err := ns.loadState(); err != nil {
klog.Warningf("Failed to load persisted volume state: %v", err)
}

return ns, nil
}

func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer {
Expand Down Expand Up @@ -139,7 +153,13 @@ func (d *Driver) Run() error {
)
d.server = s
if d.ns != nil && d.ns.RcloneOps != nil {
return d.ns.RcloneOps.Run()
onDaemonReady := func() error {
if d.ns != nil {
return d.ns.remountTrackedVolumes(context.Background())
}
return nil
}
return d.ns.RcloneOps.Run(onDaemonReady)
Comment on lines +156 to +162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Here it is better to pass in a context to the enclosing function and propagate that into remountTrackedVolumes. And an even better thing is to wrap the context passed into Run into a timeout so that if remounting the volumes takes a long time the server can give up and start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can even decide whether to have a timeout for the total remount of all storages or just make a new timeout context for each call to Mount for each mountpoint. And you could even do both.

}
s.Wait()
return nil
Expand Down
160 changes: 160 additions & 0 deletions pkg/rclone/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ package rclone

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

"gopkg.in/ini.v1"
Expand All @@ -37,6 +40,23 @@ type nodeServer struct {
*csicommon.DefaultNodeServer
mounter *mount.SafeFormatAndMount
RcloneOps Operations

// Track mounted volumes for automatic remounting
mountedVolumes map[string]*MountedVolume
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why pointer here and not just map[string]MountedVolume?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it allows for the map to stay small, therefore making it simpler and faster to reallocate when needed as we append often to it.

The MountedVolume struct is not that small, also this decouples it a bit more from the map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You really should not overthink that in go. This map will not have intensive use, so it should stay as simple as it can get. In go, this means using values as much as possible.

mutex *sync.Mutex
stateFile string
}

type MountedVolume struct {
VolumeId string
TargetPath string
Remote string
RemotePath string
ConfigData string
ReadOnly bool
Parameters map[string]string
SecretName string
SecretNamespace string
}

// Mounting Volume (Preparation)
Expand Down Expand Up @@ -141,6 +161,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
return nil, status.Error(codes.Internal, err.Error())
}

// Track the mounted volume for automatic remounting
ns.trackMountedVolume(volumeId, targetPath, remote, remotePath, configData, readOnly, flags, secretName, secretNamespace)

// err = ns.WaitForMountAvailable(targetPath)
// if err != nil {
// return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -323,6 +347,10 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if err := ns.RcloneOps.Unmount(ctx, req.GetVolumeId(), targetPath); err != nil {
klog.Warningf("Unmounting volume failed: %s", err)
}

// Remove the volume from tracking
ns.removeTrackedVolume(req.GetVolumeId())

mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, false)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
Expand All @@ -344,6 +372,84 @@ func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu
return nil, status.Errorf(codes.Unimplemented, "method NodeExpandVolume not implemented")
}

// Track mounted volume for automatic remounting
func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) {
ns.mutex.Lock()

ns.mountedVolumes[volumeId] = &MountedVolume{
VolumeId: volumeId,
TargetPath: targetPath,
Remote: remote,
RemotePath: remotePath,
ConfigData: configData,
ReadOnly: readOnly,
Parameters: parameters,
SecretName: secretName,
SecretNamespace: secretNamespace,
}
// Can't use defer here, as persistState also takes the mutex, and would fail as the Unlock happens after it returns
ns.mutex.Unlock()
klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath)

if err := ns.persistState(); err != nil {
klog.Errorf("Failed to persist volume state: %v", err)
}
Comment on lines +391 to +396
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I think that updating the mountedVolumes map and writing to file should be done in one single lock/unlock cycle, not two. What could happen now is that as soon as you unlock the mutex, it can be picked up by another trackMountedVolume call. This can probably cause weird behaviour or it can cause us to miss persisting the state to file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you could do is make persistState be a fully standalone private function that just takes a map and a file name and writes that file. And the locking happens all in the methods that just call the function.

You could even go as far as calling that function unsafePersistState and/or also making it clear in the docstring that it is not goroutine safe.

}

// Remove tracked volume when unmounted
func (ns *nodeServer) removeTrackedVolume(volumeId string) {
ns.mutex.Lock()

delete(ns.mountedVolumes, volumeId)
// Can't use defer here, as persistState also takes the mutex, and would fail as the Unlock happens after it returns
ns.mutex.Unlock()
klog.Infof("Removed tracked volume %s", volumeId)

if err := ns.persistState(); err != nil {
klog.Errorf("Failed to persist volume state: %v", err)
}
}

// Automatically remount all tracked volumes after daemon restart
func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error {
ns.mutex.Lock()
defer ns.mutex.Unlock()

if len(ns.mountedVolumes) == 0 {
klog.Info("No tracked volumes to remount")
return nil
}

klog.Infof("Remounting %d tracked volumes", len(ns.mountedVolumes))

for volumeId, mv := range ns.mountedVolumes {
Comment on lines +414 to +425
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: This is similar to the other cases. It is safer to load the text file and remount in one lock/unlock cycle.

klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath)

// Create the mount directory if it doesn't exist
if err := os.MkdirAll(mv.TargetPath, 0750); err != nil {
klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err)
continue
}

// Remount the volume
rcloneVol := &RcloneVolume{
ID: mv.VolumeId,
Remote: mv.Remote,
RemotePath: mv.RemotePath,
}

err := ns.RcloneOps.Mount(ctx, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters)
if err != nil {
klog.Errorf("Failed to remount volume %s: %v", volumeId, err)
// Don't return error here - continue with other volumes
} else {
klog.Infof("Successfully remounted volume %s", volumeId)
}
}

return nil
}

func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
for {
select {
Expand All @@ -357,3 +463,57 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
}
}
}

// Persist volume state to disk
func (ns *nodeServer) persistState() error {
if ns.stateFile == "" {
return nil
}

if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil {
return fmt.Errorf("failed to create state directory: %v", err)
}

ns.mutex.Lock()
defer ns.mutex.Unlock()

data, err := json.Marshal(ns.mountedVolumes)
if err != nil {
return fmt.Errorf("failed to marshal volume state: %v", err)
}

if err := os.WriteFile(ns.stateFile, data, 0600); err != nil {
return fmt.Errorf("failed to write state file: %v", err)
}

klog.Infof("Persisted volume state to %s", ns.stateFile)
return nil
}

// Load volume state from disk
func (ns *nodeServer) loadState() error {
if ns.stateFile == "" {
return nil
}

ns.mutex.Lock()
defer ns.mutex.Unlock()

data, err := os.ReadFile(ns.stateFile)
if err != nil {
if os.IsNotExist(err) {
klog.Info("No persisted volume state found, starting fresh")
return nil
}
return fmt.Errorf("failed to read state file: %v", err)
}

var volumes map[string]*MountedVolume
if err := json.Unmarshal(data, &volumes); err != nil {
return fmt.Errorf("failed to unmarshal volume state: %v", err)
}

ns.mountedVolumes = volumes
klog.Infof("Loaded %d tracked volumes from %s", len(ns.mountedVolumes), ns.stateFile)
return nil
}
Comment on lines +493 to +519
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): I am being incredibly picky here.. but it is not a bad idea to do one of the following:

  1. Note in the docstring that this will overwrite the map of volumes in memory
  2. Fail if the map in memory is not empty but this is called (maybe this is a bit too extreme)
  3. Merge. This may require looping through the map from disk. But you can also try to unmarshal directly into ns.mountedVolumes but I am not sure really if this is the same as overwrite or it will just update the fields it can find.

9 changes: 7 additions & 2 deletions pkg/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Operations interface {
Unmount(ctx context.Context, volumeId string, targetPath string) error
GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolume, error)
Cleanup() error
Run() error
Run(onDaemonReady func() error) error
}

type Rclone struct {
Expand Down Expand Up @@ -472,11 +472,16 @@ func (r *Rclone) start_daemon() error {
return nil
}

func (r *Rclone) Run() error {
func (r *Rclone) Run(onDaemonReady func() error) error {
err := r.start_daemon()
if err != nil {
return err
}
if onDaemonReady != nil {
if err := onDaemonReady(); err != nil {
klog.Warningf("Error in onDaemonReady callback: %v", err)
}
}
// blocks until the rclone daemon is stopped
return r.daemonCmd.Wait()
}
Expand Down