From d61d880f95c2b8020f5bfcd42aa9c59cadab5a69 Mon Sep 17 00:00:00 2001 From: Brian Kanya Date: Tue, 25 Nov 2025 13:46:19 +0100 Subject: [PATCH 1/6] Re-mount volumes on a restart or update #72 --- pkg/rclone/driver.go | 26 ++++++- pkg/rclone/nodeserver.go | 158 +++++++++++++++++++++++++++++++++++++++ pkg/rclone/rclone.go | 9 ++- 3 files changed, 187 insertions(+), 6 deletions(-) diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index 60fd38ba..e4f771ee 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -78,14 +78,26 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st } rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize) - return &nodeServer{ + // Use kubelet plugin directory for state persistence + stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json" + + ns := &nodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver), mounter: &mount.SafeFormatAndMount{ Interface: mount.New(""), Exec: utilexec.New(), }, - RcloneOps: rcloneOps, - }, nil + RcloneOps: rcloneOps, + mountedVolumes: make(map[string]*MountedVolume), + stateFile: stateFile, + } + + // Load persisted state on startup + if err := ns.loadState(); err != nil { + klog.Warningf("Failed to load persisted volume state: %v", err) + } + + return ns, nil } func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer { @@ -139,7 +151,13 @@ func (d *Driver) Run() error { ) d.server = s if d.ns != nil && d.ns.RcloneOps != nil { - return d.ns.RcloneOps.Run() + onDaemonReady := func() error { + if d.ns != nil { + return d.ns.remountTrackedVolumes(context.Background()) + } + return nil + } + return d.ns.RcloneOps.Run(onDaemonReady) } s.Wait() return nil diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index ef035016..b31def14 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -7,10 +7,13 @@ package rclone import ( "bytes" + "encoding/json" "errors" "fmt" "os" + "path/filepath" "strings" + "sync" "time" "gopkg.in/ini.v1" @@ -37,6 +40,23 @@ type nodeServer struct { *csicommon.DefaultNodeServer mounter *mount.SafeFormatAndMount RcloneOps Operations + + // Track mounted volumes for automatic remounting + mountedVolumes map[string]*MountedVolume + mutex sync.RWMutex + stateFile string +} + +type MountedVolume struct { + VolumeId string + TargetPath string + Remote string + RemotePath string + ConfigData string + ReadOnly bool + Parameters map[string]string + SecretName string + SecretNamespace string } // Mounting Volume (Preparation) @@ -141,6 +161,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } return nil, status.Error(codes.Internal, err.Error()) } + + // Track the mounted volume for automatic remounting + ns.trackMountedVolume(volumeId, targetPath, remote, remotePath, configData, readOnly, flags, secretName, secretNamespace) + // err = ns.WaitForMountAvailable(targetPath) // if err != nil { // return nil, status.Error(codes.Internal, err.Error()) @@ -323,6 +347,10 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu if err := ns.RcloneOps.Unmount(ctx, req.GetVolumeId(), targetPath); err != nil { klog.Warningf("Unmounting volume failed: %s", err) } + + // Remove the volume from tracking + ns.removeTrackedVolume(req.GetVolumeId()) + mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, false) return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -344,6 +372,82 @@ func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu return nil, status.Errorf(codes.Unimplemented, "method NodeExpandVolume not implemented") } +// Track mounted volume for automatic remounting +func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) { + ns.mutex.Lock() + defer ns.mutex.Unlock() + + ns.mountedVolumes[volumeId] = &MountedVolume{ + VolumeId: volumeId, + TargetPath: targetPath, + Remote: remote, + RemotePath: remotePath, + ConfigData: configData, + ReadOnly: readOnly, + Parameters: parameters, + SecretName: secretName, + SecretNamespace: secretNamespace, + } + klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath) + + if err := ns.persistState(); err != nil { + klog.Errorf("Failed to persist volume state: %v", err) + } +} + +// Remove tracked volume when unmounted +func (ns *nodeServer) removeTrackedVolume(volumeId string) { + ns.mutex.Lock() + defer ns.mutex.Unlock() + + delete(ns.mountedVolumes, volumeId) + klog.Infof("Removed tracked volume %s", volumeId) + + if err := ns.persistState(); err != nil { + klog.Errorf("Failed to persist volume state: %v", err) + } +} + +// Automatically remount all tracked volumes after daemon restart +func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { + ns.mutex.RLock() + defer ns.mutex.RUnlock() + + if len(ns.mountedVolumes) == 0 { + klog.Info("No tracked volumes to remount") + return nil + } + + klog.Infof("Remounting %d tracked volumes", len(ns.mountedVolumes)) + + for volumeId, mv := range ns.mountedVolumes { + klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath) + + // Create the mount directory if it doesn't exist + if err := os.MkdirAll(mv.TargetPath, 0750); err != nil { + klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err) + continue + } + + // Remount the volume + rcloneVol := &RcloneVolume{ + ID: mv.VolumeId, + Remote: mv.Remote, + RemotePath: mv.RemotePath, + } + + err := ns.RcloneOps.Mount(ctx, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters) + if err != nil { + klog.Errorf("Failed to remount volume %s: %v", volumeId, err) + // Don't return error here - continue with other volumes + } else { + klog.Infof("Successfully remounted volume %s", volumeId) + } + } + + return nil +} + func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error { for { select { @@ -357,3 +461,57 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error { } } } + +// Persist volume state to disk +func (ns *nodeServer) persistState() error { + ns.mutex.RLock() + defer ns.mutex.RUnlock() + + if ns.stateFile == "" { + return nil + } + + data, err := json.Marshal(ns.mountedVolumes) + if err != nil { + return fmt.Errorf("failed to marshal volume state: %v", err) + } + + if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { + return fmt.Errorf("failed to create state directory: %v", err) + } + + if err := os.WriteFile(ns.stateFile, data, 0600); err != nil { + return fmt.Errorf("failed to write state file: %v", err) + } + + klog.Infof("Persisted volume state to %s", ns.stateFile) + return nil +} + +// Load volume state from disk +func (ns *nodeServer) loadState() error { + ns.mutex.Lock() + defer ns.mutex.Unlock() + + if ns.stateFile == "" { + return nil + } + + data, err := os.ReadFile(ns.stateFile) + if err != nil { + if os.IsNotExist(err) { + klog.Info("No persisted volume state found, starting fresh") + return nil + } + return fmt.Errorf("failed to read state file: %v", err) + } + + var volumes map[string]*MountedVolume + if err := json.Unmarshal(data, &volumes); err != nil { + return fmt.Errorf("failed to unmarshal volume state: %v", err) + } + + ns.mountedVolumes = volumes + klog.Infof("Loaded %d tracked volumes from %s", len(ns.mountedVolumes), ns.stateFile) + return nil +} diff --git a/pkg/rclone/rclone.go b/pkg/rclone/rclone.go index 9b325084..264b2719 100644 --- a/pkg/rclone/rclone.go +++ b/pkg/rclone/rclone.go @@ -34,7 +34,7 @@ type Operations interface { Unmount(ctx context.Context, volumeId string, targetPath string) error GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolume, error) Cleanup() error - Run() error + Run(onDaemonReady func() error) error } type Rclone struct { @@ -472,11 +472,16 @@ func (r *Rclone) start_daemon() error { return nil } -func (r *Rclone) Run() error { +func (r *Rclone) Run(onDaemonReady func() error) error { err := r.start_daemon() if err != nil { return err } + if onDaemonReady != nil { + if err := onDaemonReady(); err != nil { + klog.Warningf("Error in onDaemonReady callback: %v", err) + } + } // blocks until the rclone daemon is stopped return r.daemonCmd.Wait() } From f04438a0f121dec068cc2ddff3cb8885949a3440 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Mon, 1 Dec 2025 05:47:40 +0000 Subject: [PATCH 2/6] fix_original_commit * Ensure the mutex is not copied, even when the nodeServer is copied by storing a pointer to the mutex, instead of the mutex itself * Use Mutex instead of RWMutex, as having two readers of the variable at the same time means we are going to write the state at the same time, corrupting the state file on storage. * Mutex / RWMutex are not recursive / re-entrant in Go, so in two cases we do not call `Unlock()` through `defer` as `persistState()` also takes the lock. * As a rule of thumb, Locking a Mutex should be as close as possible to the resource requiring it, to minimize the size of the critical section / the time spent holding the lock. --- pkg/rclone/driver.go | 4 ++- pkg/rclone/nodeserver.go | 64 +++++++++++++++++++++------------------- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index e4f771ee..989e642e 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -1,6 +1,7 @@ package rclone import ( + "context" "fmt" "net" "os" @@ -87,8 +88,9 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st Interface: mount.New(""), Exec: utilexec.New(), }, - RcloneOps: rcloneOps, + RcloneOps: rcloneOps, mountedVolumes: make(map[string]*MountedVolume), + mutex: &sync.Mutex{}, stateFile: stateFile, } diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index b31def14..47519d91 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -43,19 +43,19 @@ type nodeServer struct { // Track mounted volumes for automatic remounting mountedVolumes map[string]*MountedVolume - mutex sync.RWMutex + mutex *sync.Mutex stateFile string } type MountedVolume struct { - VolumeId string - TargetPath string - Remote string - RemotePath string - ConfigData string - ReadOnly bool - Parameters map[string]string - SecretName string + VolumeId string + TargetPath string + Remote string + RemotePath string + ConfigData string + ReadOnly bool + Parameters map[string]string + SecretName string SecretNamespace string } @@ -375,19 +375,20 @@ func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu // Track mounted volume for automatic remounting func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) { ns.mutex.Lock() - defer ns.mutex.Unlock() ns.mountedVolumes[volumeId] = &MountedVolume{ - VolumeId: volumeId, - TargetPath: targetPath, - Remote: remote, - RemotePath: remotePath, - ConfigData: configData, - ReadOnly: readOnly, - Parameters: parameters, - SecretName: secretName, + VolumeId: volumeId, + TargetPath: targetPath, + Remote: remote, + RemotePath: remotePath, + ConfigData: configData, + ReadOnly: readOnly, + Parameters: parameters, + SecretName: secretName, SecretNamespace: secretNamespace, } + // Can't use defer here, as persistState also takes the mutex, and would fail as the Unlock happens after it returns + ns.mutex.Unlock() klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath) if err := ns.persistState(); err != nil { @@ -398,9 +399,10 @@ func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePat // Remove tracked volume when unmounted func (ns *nodeServer) removeTrackedVolume(volumeId string) { ns.mutex.Lock() - defer ns.mutex.Unlock() delete(ns.mountedVolumes, volumeId) + // Can't use defer here, as persistState also takes the mutex, and would fail as the Unlock happens after it returns + ns.mutex.Unlock() klog.Infof("Removed tracked volume %s", volumeId) if err := ns.persistState(); err != nil { @@ -410,8 +412,8 @@ func (ns *nodeServer) removeTrackedVolume(volumeId string) { // Automatically remount all tracked volumes after daemon restart func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { - ns.mutex.RLock() - defer ns.mutex.RUnlock() + ns.mutex.Lock() + defer ns.mutex.Unlock() if len(ns.mountedVolumes) == 0 { klog.Info("No tracked volumes to remount") @@ -464,22 +466,22 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error { // Persist volume state to disk func (ns *nodeServer) persistState() error { - ns.mutex.RLock() - defer ns.mutex.RUnlock() - if ns.stateFile == "" { return nil } + if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { + return fmt.Errorf("failed to create state directory: %v", err) + } + + ns.mutex.Lock() + defer ns.mutex.Unlock() + data, err := json.Marshal(ns.mountedVolumes) if err != nil { return fmt.Errorf("failed to marshal volume state: %v", err) } - if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { - return fmt.Errorf("failed to create state directory: %v", err) - } - if err := os.WriteFile(ns.stateFile, data, 0600); err != nil { return fmt.Errorf("failed to write state file: %v", err) } @@ -490,13 +492,13 @@ func (ns *nodeServer) persistState() error { // Load volume state from disk func (ns *nodeServer) loadState() error { - ns.mutex.Lock() - defer ns.mutex.Unlock() - if ns.stateFile == "" { return nil } + ns.mutex.Lock() + defer ns.mutex.Unlock() + data, err := os.ReadFile(ns.stateFile) if err != nil { if os.IsNotExist(err) { From 9c1e8e0e7c9141f0d01490fd0f145a8a37167b90 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Tue, 2 Dec 2025 15:39:10 +0100 Subject: [PATCH 3/6] fix: Address comments --- .devcontainer/rclone/install.sh | 3 ++ pkg/rclone/driver.go | 13 ++++++-- pkg/rclone/nodeserver.go | 57 +++++++++++++-------------------- 3 files changed, 36 insertions(+), 37 deletions(-) diff --git a/.devcontainer/rclone/install.sh b/.devcontainer/rclone/install.sh index c9a86242..a972f212 100644 --- a/.devcontainer/rclone/install.sh +++ b/.devcontainer/rclone/install.sh @@ -19,3 +19,6 @@ rm -rf /tmp/rclone # Fix the $GOPATH folder chown -R "${USERNAME}:golang" /go chmod -R g+r+w /go + +# Make sure the default folders exists +mkdir -p /var/lib/kubelet/plugins/csi-rclone/ \ No newline at end of file diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index 989e642e..49423a78 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "os" + "path/filepath" "sync" "github.com/SwissDataScienceCenter/csi-rclone/pkg/kube" @@ -89,13 +90,21 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st Exec: utilexec.New(), }, RcloneOps: rcloneOps, - mountedVolumes: make(map[string]*MountedVolume), + mountedVolumes: make(map[string]MountedVolume), mutex: &sync.Mutex{}, stateFile: stateFile, } + // Ensure the folder exists + if err = os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { + return nil, fmt.Errorf("failed to create state directory: %v", err) + } + // Load persisted state on startup - if err := ns.loadState(); err != nil { + ns.mutex.Lock() + defer ns.mutex.Unlock() + + if ns.mountedVolumes, err = readVolumeMap(ns.stateFile); err != nil { klog.Warningf("Failed to load persisted volume state: %v", err) } diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index 47519d91..39616cb3 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -11,7 +11,6 @@ import ( "errors" "fmt" "os" - "path/filepath" "strings" "sync" "time" @@ -42,7 +41,7 @@ type nodeServer struct { RcloneOps Operations // Track mounted volumes for automatic remounting - mountedVolumes map[string]*MountedVolume + mountedVolumes map[string]MountedVolume mutex *sync.Mutex stateFile string } @@ -375,8 +374,9 @@ func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu // Track mounted volume for automatic remounting func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) { ns.mutex.Lock() + defer ns.mutex.Unlock() - ns.mountedVolumes[volumeId] = &MountedVolume{ + ns.mountedVolumes[volumeId] = MountedVolume{ VolumeId: volumeId, TargetPath: targetPath, Remote: remote, @@ -387,11 +387,9 @@ func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePat SecretName: secretName, SecretNamespace: secretNamespace, } - // Can't use defer here, as persistState also takes the mutex, and would fail as the Unlock happens after it returns - ns.mutex.Unlock() klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath) - if err := ns.persistState(); err != nil { + if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil { klog.Errorf("Failed to persist volume state: %v", err) } } @@ -399,13 +397,12 @@ func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePat // Remove tracked volume when unmounted func (ns *nodeServer) removeTrackedVolume(volumeId string) { ns.mutex.Lock() + defer ns.mutex.Unlock() delete(ns.mountedVolumes, volumeId) - // Can't use defer here, as persistState also takes the mutex, and would fail as the Unlock happens after it returns - ns.mutex.Unlock() klog.Infof("Removed tracked volume %s", volumeId) - if err := ns.persistState(); err != nil { + if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil { klog.Errorf("Failed to persist volume state: %v", err) } } @@ -465,55 +462,45 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error { } // Persist volume state to disk -func (ns *nodeServer) persistState() error { - if ns.stateFile == "" { +func writeVolumeMap(filename string, volumes map[string]MountedVolume) error { + if filename == "" { return nil } - if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { - return fmt.Errorf("failed to create state directory: %v", err) - } - - ns.mutex.Lock() - defer ns.mutex.Unlock() - - data, err := json.Marshal(ns.mountedVolumes) + data, err := json.Marshal(volumes) if err != nil { return fmt.Errorf("failed to marshal volume state: %v", err) } - if err := os.WriteFile(ns.stateFile, data, 0600); err != nil { + if err := os.WriteFile(filename, data, 0600); err != nil { return fmt.Errorf("failed to write state file: %v", err) } - klog.Infof("Persisted volume state to %s", ns.stateFile) + klog.Infof("Persisted volume state to %s", filename) return nil } // Load volume state from disk -func (ns *nodeServer) loadState() error { - if ns.stateFile == "" { - return nil - } +func readVolumeMap(filename string) (map[string]MountedVolume, error) { + volumes := make(map[string]MountedVolume) - ns.mutex.Lock() - defer ns.mutex.Unlock() + if filename == "" { + return volumes, nil + } - data, err := os.ReadFile(ns.stateFile) + data, err := os.ReadFile(filename) if err != nil { if os.IsNotExist(err) { klog.Info("No persisted volume state found, starting fresh") - return nil + return volumes, nil } - return fmt.Errorf("failed to read state file: %v", err) + return volumes, fmt.Errorf("failed to read state file: %v", err) } - var volumes map[string]*MountedVolume if err := json.Unmarshal(data, &volumes); err != nil { - return fmt.Errorf("failed to unmarshal volume state: %v", err) + return nil, fmt.Errorf("failed to unmarshal volume state: %v", err) } - ns.mountedVolumes = volumes - klog.Infof("Loaded %d tracked volumes from %s", len(ns.mountedVolumes), ns.stateFile) - return nil + klog.Infof("Loaded %d tracked volumes from %s", len(volumes), filename) + return volumes, nil } From cf0229eff58e26e5a3e132802d3ec1e69837f1bd Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Wed, 3 Dec 2025 13:47:12 +0100 Subject: [PATCH 4/6] Address comment about using contexts and remount in parallel --- pkg/rclone/nodeserver.go | 79 ++++++++++++++++++++++++++++++---------- 1 file changed, 59 insertions(+), 20 deletions(-) diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index 39616cb3..b9a7b891 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "os" + "runtime" "strings" "sync" "time" @@ -409,9 +410,17 @@ func (ns *nodeServer) removeTrackedVolume(volumeId string) { // Automatically remount all tracked volumes after daemon restart func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { + type mountResult struct { + volumeID string + err error + } + ns.mutex.Lock() defer ns.mutex.Unlock() + wg := new(sync.WaitGroup) + defer wg.Wait() + if len(ns.mountedVolumes) == 0 { klog.Info("No tracked volumes to remount") return nil @@ -419,32 +428,62 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { klog.Infof("Remounting %d tracked volumes", len(ns.mountedVolumes)) + // Limit the number of active workers to the number of CPU threads (arbitrarily chosen) + limits := make(chan bool, runtime.GOMAXPROCS(0)) + + results := make(chan mountResult, len(ns.mountedVolumes)) + ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + for volumeId, mv := range ns.mountedVolumes { - klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath) + wg.Add(1) + go func() { + defer wg.Done() - // Create the mount directory if it doesn't exist - if err := os.MkdirAll(mv.TargetPath, 0750); err != nil { - klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err) - continue - } + limits <- true // block until there is a free slot in the queue + defer func() { + <-limits // free a slot in the queue when we exit + }() - // Remount the volume - rcloneVol := &RcloneVolume{ - ID: mv.VolumeId, - Remote: mv.Remote, - RemotePath: mv.RemotePath, - } + ctxWithMountTimeout, cancel := context.WithTimeout(ctxWithTimeout, 30*time.Second) + defer cancel() - err := ns.RcloneOps.Mount(ctx, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters) - if err != nil { - klog.Errorf("Failed to remount volume %s: %v", volumeId, err) - // Don't return error here - continue with other volumes - } else { - klog.Infof("Successfully remounted volume %s", volumeId) - } + klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath) + + // Create the mount directory if it doesn't exist + var err error + if err = os.MkdirAll(mv.TargetPath, 0750); err != nil { + klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err) + } else { + // Remount the volume + rcloneVol := &RcloneVolume{ + ID: mv.VolumeId, + Remote: mv.Remote, + RemotePath: mv.RemotePath, + } + + err = ns.RcloneOps.Mount(ctxWithMountTimeout, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters) + } + + results <- mountResult{volumeId, err} + }() } - return nil + for { + select { + case result := <-results: + if result.err != nil { + klog.Errorf("Failed to remount volume %s: %v", result.volumeID, result.err) + // Don't return error here, continue with other volumes not to block all users because of a failed mount. + delete(ns.mountedVolumes, result.volumeID) + // Should we keep it on disk? This will be lost on the first new mount which will override the file. + } else { + klog.Infof("Successfully remounted volume %s", result.volumeID) + } + case <-ctxWithTimeout.Done(): + return ctxWithTimeout.Err() + } + } } func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error { From 768ec194026958c56fe10a471eef795c25abbd33 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Wed, 3 Dec 2025 13:17:52 +0000 Subject: [PATCH 5/6] fix: Add missing channels closures --- pkg/rclone/nodeserver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index b9a7b891..5ba07847 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -430,8 +430,11 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { // Limit the number of active workers to the number of CPU threads (arbitrarily chosen) limits := make(chan bool, runtime.GOMAXPROCS(0)) + defer close(limits) results := make(chan mountResult, len(ns.mountedVolumes)) + defer close(results) + ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() From dfd641532a35fab7fbd7d6164d0c11b82e5de283 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Wed, 3 Dec 2025 16:33:53 +0100 Subject: [PATCH 6/6] fix: Remove Waitgroup and use a simple counter, and solve a bug --- pkg/rclone/nodeserver.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index 5ba07847..0d26791a 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -418,9 +418,6 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { ns.mutex.Lock() defer ns.mutex.Unlock() - wg := new(sync.WaitGroup) - defer wg.Wait() - if len(ns.mountedVolumes) == 0 { klog.Info("No tracked volumes to remount") return nil @@ -432,17 +429,15 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { limits := make(chan bool, runtime.GOMAXPROCS(0)) defer close(limits) - results := make(chan mountResult, len(ns.mountedVolumes)) + volumesCount := len(ns.mountedVolumes) + results := make(chan mountResult, volumesCount) defer close(results) ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() for volumeId, mv := range ns.mountedVolumes { - wg.Add(1) go func() { - defer wg.Done() - limits <- true // block until there is a free slot in the queue defer func() { <-limits // free a slot in the queue when we exit @@ -475,6 +470,7 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { for { select { case result := <-results: + volumesCount-- if result.err != nil { klog.Errorf("Failed to remount volume %s: %v", result.volumeID, result.err) // Don't return error here, continue with other volumes not to block all users because of a failed mount. @@ -483,6 +479,9 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { } else { klog.Infof("Successfully remounted volume %s", result.volumeID) } + if volumesCount == 0 { + return nil + } case <-ctxWithTimeout.Done(): return ctxWithTimeout.Err() }