Skip to content

Commit 81a8c1b

Browse files
sambuc/feat merge restart pr 1 (#77)
* Merge Re-mount volumes on a restart or update #72 * Ensure the mutex is not copied, even when the nodeServer is copied by storing a pointer to the mutex, instead of the mutex itself * Use Mutex instead of RWMutex, as having two readers of the variable at the same time means we are going to write the state at the same time, corrupting the state file on storage. * Use private freestanding functions mutex as Mutex / RWMutex are not recursive / re-entrant in Go. * Improve the remounting logic by using a rate-limited amount of background goroutines and adding timeouts values so that the service startup time is not dependent on the number of tracked volumes. --------- Co-authored-by: Brian Kanya <[email protected]>
1 parent 1b7ec91 commit 81a8c1b

File tree

4 files changed

+231
-6
lines changed

4 files changed

+231
-6
lines changed

.devcontainer/rclone/install.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,6 @@ rm -rf /tmp/rclone
1919
# Fix the $GOPATH folder
2020
chown -R "${USERNAME}:golang" /go
2121
chmod -R g+r+w /go
22+
23+
# Make sure the default folders exists
24+
mkdir -p /var/lib/kubelet/plugins/csi-rclone/

pkg/rclone/driver.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package rclone
22

33
import (
4+
"context"
45
"fmt"
56
"net"
67
"os"
8+
"path/filepath"
79
"sync"
810

911
"github.com/SwissDataScienceCenter/csi-rclone/pkg/kube"
@@ -78,14 +80,35 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st
7880
}
7981
rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize)
8082

81-
return &nodeServer{
83+
// Use kubelet plugin directory for state persistence
84+
stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json"
85+
86+
ns := &nodeServer{
8287
DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver),
8388
mounter: &mount.SafeFormatAndMount{
8489
Interface: mount.New(""),
8590
Exec: utilexec.New(),
8691
},
87-
RcloneOps: rcloneOps,
88-
}, nil
92+
RcloneOps: rcloneOps,
93+
mountedVolumes: make(map[string]MountedVolume),
94+
mutex: &sync.Mutex{},
95+
stateFile: stateFile,
96+
}
97+
98+
// Ensure the folder exists
99+
if err = os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil {
100+
return nil, fmt.Errorf("failed to create state directory: %v", err)
101+
}
102+
103+
// Load persisted state on startup
104+
ns.mutex.Lock()
105+
defer ns.mutex.Unlock()
106+
107+
if ns.mountedVolumes, err = readVolumeMap(ns.stateFile); err != nil {
108+
klog.Warningf("Failed to load persisted volume state: %v", err)
109+
}
110+
111+
return ns, nil
89112
}
90113

91114
func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer {
@@ -139,7 +162,13 @@ func (d *Driver) Run() error {
139162
)
140163
d.server = s
141164
if d.ns != nil && d.ns.RcloneOps != nil {
142-
return d.ns.RcloneOps.Run()
165+
onDaemonReady := func() error {
166+
if d.ns != nil {
167+
return d.ns.remountTrackedVolumes(context.Background())
168+
}
169+
return nil
170+
}
171+
return d.ns.RcloneOps.Run(onDaemonReady)
143172
}
144173
s.Wait()
145174
return nil

pkg/rclone/nodeserver.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ package rclone
77

88
import (
99
"bytes"
10+
"encoding/json"
1011
"errors"
1112
"fmt"
1213
"os"
14+
"runtime"
1315
"strings"
16+
"sync"
1417
"time"
1518

1619
"gopkg.in/ini.v1"
@@ -37,6 +40,23 @@ type nodeServer struct {
3740
*csicommon.DefaultNodeServer
3841
mounter *mount.SafeFormatAndMount
3942
RcloneOps Operations
43+
44+
// Track mounted volumes for automatic remounting
45+
mountedVolumes map[string]MountedVolume
46+
mutex *sync.Mutex
47+
stateFile string
48+
}
49+
50+
type MountedVolume struct {
51+
VolumeId string
52+
TargetPath string
53+
Remote string
54+
RemotePath string
55+
ConfigData string
56+
ReadOnly bool
57+
Parameters map[string]string
58+
SecretName string
59+
SecretNamespace string
4060
}
4161

4262
// Mounting Volume (Preparation)
@@ -141,6 +161,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
141161
}
142162
return nil, status.Error(codes.Internal, err.Error())
143163
}
164+
165+
// Track the mounted volume for automatic remounting
166+
ns.trackMountedVolume(volumeId, targetPath, remote, remotePath, configData, readOnly, flags, secretName, secretNamespace)
167+
144168
// err = ns.WaitForMountAvailable(targetPath)
145169
// if err != nil {
146170
// return nil, status.Error(codes.Internal, err.Error())
@@ -323,6 +347,10 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
323347
if err := ns.RcloneOps.Unmount(ctx, req.GetVolumeId(), targetPath); err != nil {
324348
klog.Warningf("Unmounting volume failed: %s", err)
325349
}
350+
351+
// Remove the volume from tracking
352+
ns.removeTrackedVolume(req.GetVolumeId())
353+
326354
mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, false)
327355
return &csi.NodeUnpublishVolumeResponse{}, nil
328356
}
@@ -344,6 +372,122 @@ func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu
344372
return nil, status.Errorf(codes.Unimplemented, "method NodeExpandVolume not implemented")
345373
}
346374

375+
// Track mounted volume for automatic remounting
376+
func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) {
377+
ns.mutex.Lock()
378+
defer ns.mutex.Unlock()
379+
380+
ns.mountedVolumes[volumeId] = MountedVolume{
381+
VolumeId: volumeId,
382+
TargetPath: targetPath,
383+
Remote: remote,
384+
RemotePath: remotePath,
385+
ConfigData: configData,
386+
ReadOnly: readOnly,
387+
Parameters: parameters,
388+
SecretName: secretName,
389+
SecretNamespace: secretNamespace,
390+
}
391+
klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath)
392+
393+
if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil {
394+
klog.Errorf("Failed to persist volume state: %v", err)
395+
}
396+
}
397+
398+
// Remove tracked volume when unmounted
399+
func (ns *nodeServer) removeTrackedVolume(volumeId string) {
400+
ns.mutex.Lock()
401+
defer ns.mutex.Unlock()
402+
403+
delete(ns.mountedVolumes, volumeId)
404+
klog.Infof("Removed tracked volume %s", volumeId)
405+
406+
if err := writeVolumeMap(ns.stateFile, ns.mountedVolumes); err != nil {
407+
klog.Errorf("Failed to persist volume state: %v", err)
408+
}
409+
}
410+
411+
// Automatically remount all tracked volumes after daemon restart
412+
func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error {
413+
type mountResult struct {
414+
volumeID string
415+
err error
416+
}
417+
418+
ns.mutex.Lock()
419+
defer ns.mutex.Unlock()
420+
421+
if len(ns.mountedVolumes) == 0 {
422+
klog.Info("No tracked volumes to remount")
423+
return nil
424+
}
425+
426+
klog.Infof("Remounting %d tracked volumes", len(ns.mountedVolumes))
427+
428+
// Limit the number of active workers to the number of CPU threads (arbitrarily chosen)
429+
limits := make(chan bool, runtime.GOMAXPROCS(0))
430+
defer close(limits)
431+
432+
volumesCount := len(ns.mountedVolumes)
433+
results := make(chan mountResult, volumesCount)
434+
defer close(results)
435+
436+
ctxWithTimeout, cancel := context.WithTimeout(ctx, 60*time.Second)
437+
defer cancel()
438+
439+
for volumeId, mv := range ns.mountedVolumes {
440+
go func() {
441+
limits <- true // block until there is a free slot in the queue
442+
defer func() {
443+
<-limits // free a slot in the queue when we exit
444+
}()
445+
446+
ctxWithMountTimeout, cancel := context.WithTimeout(ctxWithTimeout, 30*time.Second)
447+
defer cancel()
448+
449+
klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath)
450+
451+
// Create the mount directory if it doesn't exist
452+
var err error
453+
if err = os.MkdirAll(mv.TargetPath, 0750); err != nil {
454+
klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err)
455+
} else {
456+
// Remount the volume
457+
rcloneVol := &RcloneVolume{
458+
ID: mv.VolumeId,
459+
Remote: mv.Remote,
460+
RemotePath: mv.RemotePath,
461+
}
462+
463+
err = ns.RcloneOps.Mount(ctxWithMountTimeout, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters)
464+
}
465+
466+
results <- mountResult{volumeId, err}
467+
}()
468+
}
469+
470+
for {
471+
select {
472+
case result := <-results:
473+
volumesCount--
474+
if result.err != nil {
475+
klog.Errorf("Failed to remount volume %s: %v", result.volumeID, result.err)
476+
// Don't return error here, continue with other volumes not to block all users because of a failed mount.
477+
delete(ns.mountedVolumes, result.volumeID)
478+
// Should we keep it on disk? This will be lost on the first new mount which will override the file.
479+
} else {
480+
klog.Infof("Successfully remounted volume %s", result.volumeID)
481+
}
482+
if volumesCount == 0 {
483+
return nil
484+
}
485+
case <-ctxWithTimeout.Done():
486+
return ctxWithTimeout.Err()
487+
}
488+
}
489+
}
490+
347491
func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
348492
for {
349493
select {
@@ -357,3 +501,47 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
357501
}
358502
}
359503
}
504+
505+
// Persist volume state to disk
506+
func writeVolumeMap(filename string, volumes map[string]MountedVolume) error {
507+
if filename == "" {
508+
return nil
509+
}
510+
511+
data, err := json.Marshal(volumes)
512+
if err != nil {
513+
return fmt.Errorf("failed to marshal volume state: %v", err)
514+
}
515+
516+
if err := os.WriteFile(filename, data, 0600); err != nil {
517+
return fmt.Errorf("failed to write state file: %v", err)
518+
}
519+
520+
klog.Infof("Persisted volume state to %s", filename)
521+
return nil
522+
}
523+
524+
// Load volume state from disk
525+
func readVolumeMap(filename string) (map[string]MountedVolume, error) {
526+
volumes := make(map[string]MountedVolume)
527+
528+
if filename == "" {
529+
return volumes, nil
530+
}
531+
532+
data, err := os.ReadFile(filename)
533+
if err != nil {
534+
if os.IsNotExist(err) {
535+
klog.Info("No persisted volume state found, starting fresh")
536+
return volumes, nil
537+
}
538+
return volumes, fmt.Errorf("failed to read state file: %v", err)
539+
}
540+
541+
if err := json.Unmarshal(data, &volumes); err != nil {
542+
return nil, fmt.Errorf("failed to unmarshal volume state: %v", err)
543+
}
544+
545+
klog.Infof("Loaded %d tracked volumes from %s", len(volumes), filename)
546+
return volumes, nil
547+
}

pkg/rclone/rclone.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Operations interface {
3434
Unmount(ctx context.Context, volumeId string, targetPath string) error
3535
GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolume, error)
3636
Cleanup() error
37-
Run() error
37+
Run(onDaemonReady func() error) error
3838
}
3939

4040
type Rclone struct {
@@ -472,11 +472,16 @@ func (r *Rclone) start_daemon() error {
472472
return nil
473473
}
474474

475-
func (r *Rclone) Run() error {
475+
func (r *Rclone) Run(onDaemonReady func() error) error {
476476
err := r.start_daemon()
477477
if err != nil {
478478
return err
479479
}
480+
if onDaemonReady != nil {
481+
if err := onDaemonReady(); err != nil {
482+
klog.Warningf("Error in onDaemonReady callback: %v", err)
483+
}
484+
}
480485
// blocks until the rclone daemon is stopped
481486
return r.daemonCmd.Wait()
482487
}

0 commit comments

Comments
 (0)