From 9ee1179d2f97085df4512bd7fa3d4a2c097010c7 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Tue, 25 Nov 2025 12:37:03 +0100 Subject: [PATCH 01/11] chore: refactor CLI code --- cmd/csi-rclone-plugin/main.go | 62 ++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index 2a238a7..905ee7e 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -48,34 +48,10 @@ func main() { Use: "run", Short: "Start the CSI driver.", } - root.AddCommand(runCmd) + nodeCommandLineParameters(runCmd) + controllerCommandLineParameters(runCmd) - runNode := &cobra.Command{ - Use: "node", - Short: "Start the CSI driver node service - expected to run in a daemonset on every node.", - Run: func(cmd *cobra.Command, args []string) { - handleNode() - }, - } - runNode.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id") - runNode.MarkPersistentFlagRequired("nodeid") - runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") - runNode.MarkPersistentFlagRequired("endpoint") - runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir") - runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size") - runCmd.AddCommand(runNode) - runController := &cobra.Command{ - Use: "controller", - Short: "Start the CSI driver controller.", - Run: func(cmd *cobra.Command, args []string) { - handleController() - }, - } - runController.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id") - runController.MarkPersistentFlagRequired("nodeid") - runController.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") - runController.MarkPersistentFlagRequired("endpoint") - runCmd.AddCommand(runController) + root.AddCommand(runCmd) versionCmd := &cobra.Command{ Use: "version", @@ -123,6 +99,23 @@ func handleNode() { } } +func nodeCommandLineParameters(runCmd *cobra.Command) { + runNode := &cobra.Command{ + Use: "node", + Short: "Start the CSI driver node service - expected to run in a daemonset on every node.", + Run: func(cmd *cobra.Command, args []string) { + handleNode() + }, + } + runNode.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id") + runNode.MarkPersistentFlagRequired("nodeid") + runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") + runNode.MarkPersistentFlagRequired("endpoint") + runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir") + runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size") + runCmd.AddCommand(runNode) +} + func handleController() { d := rclone.NewDriver(nodeID, endpoint) cs := rclone.NewControllerServer(d.CSIDriver) @@ -134,6 +127,21 @@ func handleController() { } } +func controllerCommandLineParameters(runCmd *cobra.Command) { + runController := &cobra.Command{ + Use: "controller", + Short: "Start the CSI driver controller.", + Run: func(cmd *cobra.Command, args []string) { + handleController() + }, + } + runController.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id") + runController.MarkPersistentFlagRequired("nodeid") + runController.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") + runController.MarkPersistentFlagRequired("endpoint") + runCmd.AddCommand(runController) +} + // unmountOldVols is used to unmount volumes after a restart on a node func unmountOldVols() error { const mountType = "fuse.rclone" From c9a8e7f3482afa38700cde9d94833b1387e5ffe0 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Tue, 25 Nov 2025 12:49:13 +0100 Subject: [PATCH 02/11] fix: refactor Controller --- pkg/rclone/controllerserver.go | 25 +++++++++++++++++++++++++ pkg/rclone/driver.go | 24 ------------------------ 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/pkg/rclone/controllerserver.go b/pkg/rclone/controllerserver.go index 4e00dd7..2002953 100644 --- a/pkg/rclone/controllerserver.go +++ b/pkg/rclone/controllerserver.go @@ -5,7 +5,9 @@ package rclone import ( "sync" + "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,6 +24,29 @@ type controllerServer struct { mutex sync.RWMutex } +func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer { + return &controllerServer{ + DefaultControllerServer: csicommon.NewDefaultControllerServer(csiDriver), + active_volumes: map[string]int64{}, + mutex: sync.RWMutex{}, + } +} + +func (cs *controllerServer) Metrics() []metrics.Observable { + var meters []metrics.Observable + + meter := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "csi_rclone_active_volume_count", + Help: "Number of active (Mounted) volumes.", + }) + meters = append(meters, + func() { meter.Set(float64(len(cs.active_volumes))) }, + ) + prometheus.MustRegister(meter) + + return meters +} + func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { volId := req.GetVolumeId() if len(volId) == 0 { diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index 49423a7..335060c 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -12,7 +12,6 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" - "github.com/prometheus/client_golang/prometheus" "k8s.io/klog" "k8s.io/utils/mount" @@ -111,14 +110,6 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st return ns, nil } -func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer { - return &controllerServer{ - DefaultControllerServer: csicommon.NewDefaultControllerServer(csiDriver), - active_volumes: map[string]int64{}, - mutex: sync.RWMutex{}, - } -} - func (ns *nodeServer) Metrics() []metrics.Observable { var meters []metrics.Observable @@ -127,21 +118,6 @@ func (ns *nodeServer) Metrics() []metrics.Observable { return meters } -func (cs *controllerServer) Metrics() []metrics.Observable { - var meters []metrics.Observable - - meter := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "csi_rclone_active_volume_count", - Help: "Number of active (Mounted) volumes.", - }) - meters = append(meters, - func() { meter.Set(float64(len(cs.active_volumes))) }, - ) - prometheus.MustRegister(meter) - - return meters -} - func (d *Driver) WithNodeServer(ns *nodeServer) *Driver { d.ns = ns return d From 4b774e60d04b8fff33b1c1313273923a66a4042c Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Tue, 25 Nov 2025 13:43:41 +0100 Subject: [PATCH 03/11] fix: refactor NodeServer --- cmd/csi-rclone-plugin/main.go | 30 ---------- pkg/rclone/driver.go | 72 ---------------------- pkg/rclone/nodeserver.go | 109 ++++++++++++++++++++++++++++++++-- 3 files changed, 103 insertions(+), 108 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index 905ee7e..4c86336 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -12,8 +12,6 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/SwissDataScienceCenter/csi-rclone/pkg/rclone" "github.com/spf13/cobra" - "k8s.io/klog" - mountUtils "k8s.io/mount-utils" ) var ( @@ -82,10 +80,6 @@ func main() { } func handleNode() { - err := unmountOldVols() - if err != nil { - klog.Warningf("There was an error when trying to unmount old volumes: %v", err) - } d := rclone.NewDriver(nodeID, endpoint) ns, err := rclone.NewNodeServer(d.CSIDriver, cacheDir, cacheSize) if err != nil { @@ -141,27 +135,3 @@ func controllerCommandLineParameters(runCmd *cobra.Command) { runController.MarkPersistentFlagRequired("endpoint") runCmd.AddCommand(runController) } - -// unmountOldVols is used to unmount volumes after a restart on a node -func unmountOldVols() error { - const mountType = "fuse.rclone" - const unmountTimeout = time.Second * 5 - klog.Info("Checking for existing mounts") - mounter := mountUtils.Mounter{} - mounts, err := mounter.List() - if err != nil { - return err - } - for _, mount := range mounts { - if mount.Type != mountType { - continue - } - err := mounter.UnmountWithForce(mount.Path, unmountTimeout) - if err != nil { - klog.Warningf("Failed to unmount %s because of %v.", mount.Path, err) - continue - } - klog.Infof("Sucessfully unmounted %s", mount.Path) - } - return nil -} diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index 335060c..c152dff 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -2,20 +2,11 @@ package rclone import ( "context" - "fmt" - "net" "os" - "path/filepath" - "sync" - "github.com/SwissDataScienceCenter/csi-rclone/pkg/kube" - "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" "k8s.io/klog" - "k8s.io/utils/mount" - - utilexec "k8s.io/utils/exec" ) type Driver struct { @@ -33,18 +24,6 @@ var ( DriverVersion = "SwissDataScienceCenter" ) -func getFreePort() (port int, err error) { - var a *net.TCPAddr - if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { - var l *net.TCPListener - if l, err = net.ListenTCP("tcp", a); err == nil { - defer l.Close() - return l.Addr().(*net.TCPAddr).Port, nil - } - } - return -} - func NewDriver(nodeID, endpoint string) *Driver { driverName := os.Getenv("DRIVER_NAME") if driverName == "" { @@ -67,57 +46,6 @@ func NewDriver(nodeID, endpoint string) *Driver { return d } -func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize string) (*nodeServer, error) { - kubeClient, err := kube.GetK8sClient() - if err != nil { - return nil, err - } - - rclonePort, err := getFreePort() - if err != nil { - return nil, fmt.Errorf("Cannot get a free TCP port to run rclone") - } - rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize) - - // Use kubelet plugin directory for state persistence - stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json" - - ns := &nodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver), - mounter: &mount.SafeFormatAndMount{ - Interface: mount.New(""), - Exec: utilexec.New(), - }, - RcloneOps: rcloneOps, - mountedVolumes: make(map[string]MountedVolume), - mutex: &sync.Mutex{}, - stateFile: stateFile, - } - - // Ensure the folder exists - if err = os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { - return nil, fmt.Errorf("failed to create state directory: %v", err) - } - - // Load persisted state on startup - ns.mutex.Lock() - defer ns.mutex.Unlock() - - if ns.mountedVolumes, err = readVolumeMap(ns.stateFile); err != nil { - klog.Warningf("Failed to load persisted volume state: %v", err) - } - - return ns, nil -} - -func (ns *nodeServer) Metrics() []metrics.Observable { - var meters []metrics.Observable - - // What should we meter? - - return meters -} - func (d *Driver) WithNodeServer(ns *nodeServer) *Driver { d.ns = ns return d diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index 0d26791..db11373 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -10,24 +10,28 @@ import ( "encoding/json" "errors" "fmt" + "net" "os" + "path/filepath" "runtime" "strings" "sync" "time" - "gopkg.in/ini.v1" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog" - "github.com/SwissDataScienceCenter/csi-rclone/pkg/kube" + "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/fernet/fernet-go" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "gopkg.in/ini.v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog" + mountutils "k8s.io/mount-utils" + "k8s.io/utils/exec" "k8s.io/utils/mount" csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" @@ -47,6 +51,99 @@ type nodeServer struct { stateFile string } +// unmountOldVols is used to unmount volumes after a restart on a node +func unmountOldVols() error { + const mountType = "fuse.rclone" + const unmountTimeout = time.Second * 5 + klog.Info("Checking for existing mounts") + mounter := mountutils.Mounter{} + mounts, err := mounter.List() + if err != nil { + return err + } + for _, mount := range mounts { + if mount.Type != mountType { + continue + } + err := mounter.UnmountWithForce(mount.Path, unmountTimeout) + if err != nil { + klog.Warningf("Failed to unmount %s because of %v.", mount.Path, err) + continue + } + klog.Infof("Sucessfully unmounted %s", mount.Path) + } + return nil +} + +func getFreePort() (port int, err error) { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil + } + } + return +} + +func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize string) (*nodeServer, error) { + err := unmountOldVols() + if err != nil { + klog.Warningf("There was an error when trying to unmount old volumes: %v", err) + return nil, err + } + + kubeClient, err := kube.GetK8sClient() + if err != nil { + return nil, err + } + + rclonePort, err := getFreePort() + if err != nil { + return nil, fmt.Errorf("Cannot get a free TCP port to run rclone") + } + rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize) + + // Use kubelet plugin directory for state persistence + stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json" + + ns := &nodeServer{ + DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver), + mounter: &mount.SafeFormatAndMount{ + Interface: mount.New(""), + Exec: exec.New(), + }, + RcloneOps: rcloneOps, + mountedVolumes: make(map[string]MountedVolume), + mutex: &sync.Mutex{}, + stateFile: stateFile, + } + + // Ensure the folder exists + if err = os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil { + return nil, fmt.Errorf("failed to create state directory: %v", err) + } + + // Load persisted state on startup + ns.mutex.Lock() + defer ns.mutex.Unlock() + + if ns.mountedVolumes, err = readVolumeMap(ns.stateFile); err != nil { + klog.Warningf("Failed to load persisted volume state: %v", err) + } + + return ns, nil +} + +func (ns *nodeServer) Metrics() []metrics.Observable { + var meters []metrics.Observable + + // What should we meter? + + return meters +} + type MountedVolume struct { VolumeId string TargetPath string From 8b45ea003546cbb1a0dab471cc7fe36c8facbdbd Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 09:30:01 +0100 Subject: [PATCH 04/11] fix: make controllerServer public --- pkg/rclone/controllerserver.go | 24 ++++++++++++------------ pkg/rclone/driver.go | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/rclone/controllerserver.go b/pkg/rclone/controllerserver.go index 2002953..d0a2059 100644 --- a/pkg/rclone/controllerserver.go +++ b/pkg/rclone/controllerserver.go @@ -18,21 +18,21 @@ import ( const secretAnnotationName = "csi-rclone.dev/secretName" -type controllerServer struct { +type ControllerServer struct { *csicommon.DefaultControllerServer active_volumes map[string]int64 mutex sync.RWMutex } -func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer { - return &controllerServer{ +func NewControllerServer(csiDriver *csicommon.CSIDriver) *ControllerServer { + return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(csiDriver), active_volumes: map[string]int64{}, mutex: sync.RWMutex{}, } } -func (cs *controllerServer) Metrics() []metrics.Observable { +func (cs *ControllerServer) Metrics() []metrics.Observable { var meters []metrics.Observable meter := prometheus.NewGauge(prometheus.GaugeOpts{ @@ -47,7 +47,7 @@ func (cs *controllerServer) Metrics() []metrics.Observable { return meters } -func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { +func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { volId := req.GetVolumeId() if len(volId) == 0 { return nil, status.Error(codes.InvalidArgument, "ValidateVolumeCapabilities must be provided volume id") @@ -71,17 +71,17 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req } // Attaching Volume -func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { +func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ControllerPublishVolume not implemented") } // Detaching Volume -func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { +func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ControllerUnpublishVolume not implemented") } // Provisioning Volumes -func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { +func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { klog.Infof("ControllerCreateVolume: called with args %+v", *req) volumeName := req.GetName() if len(volumeName) == 0 { @@ -140,7 +140,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // Delete Volume -func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volId := req.GetVolumeId() if len(volId) == 0 { return nil, status.Error(codes.InvalidArgument, "DeteleVolume must be provided volume id") @@ -152,16 +152,16 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } -func (*controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { +func (*ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ControllerExpandVolume not implemented") } -func (cs *controllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { +func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { return &csi.ControllerGetVolumeResponse{Volume: &csi.Volume{ VolumeId: req.VolumeId, }}, nil } -func (cs *controllerServer) ControllerModifyVolume(ctx context.Context, req *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { +func (cs *ControllerServer) ControllerModifyVolume(ctx context.Context, req *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { return &csi.ControllerModifyVolumeResponse{}, nil } diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index c152dff..27367b0 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -14,7 +14,7 @@ type Driver struct { endpoint string ns *nodeServer - cs *controllerServer + cs *ControllerServer cap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability server csicommon.NonBlockingGRPCServer @@ -51,7 +51,7 @@ func (d *Driver) WithNodeServer(ns *nodeServer) *Driver { return d } -func (d *Driver) WithControllerServer(cs *controllerServer) *Driver { +func (d *Driver) WithControllerServer(cs *ControllerServer) *Driver { d.cs = cs return d } From 64700426b5f91aa91eb06cc0c4fdb9712b86d04c Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 09:57:45 +0100 Subject: [PATCH 05/11] fix: make NodeServer public --- pkg/rclone/driver.go | 4 ++-- pkg/rclone/nodeserver.go | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index 27367b0..4110d98 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -13,7 +13,7 @@ type Driver struct { CSIDriver *csicommon.CSIDriver endpoint string - ns *nodeServer + ns *NodeServer cs *ControllerServer cap []*csi.VolumeCapability_AccessMode cscap []*csi.ControllerServiceCapability @@ -46,7 +46,7 @@ func NewDriver(nodeID, endpoint string) *Driver { return d } -func (d *Driver) WithNodeServer(ns *nodeServer) *Driver { +func (d *Driver) WithNodeServer(ns *NodeServer) *Driver { d.ns = ns return d } diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index db11373..b686082 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -40,7 +40,7 @@ import ( const CSI_ANNOTATION_PREFIX = "csi-rclone.dev" const pvcSecretNameAnnotation = CSI_ANNOTATION_PREFIX + "/secretName" -type nodeServer struct { +type NodeServer struct { *csicommon.DefaultNodeServer mounter *mount.SafeFormatAndMount RcloneOps Operations @@ -87,7 +87,7 @@ func getFreePort() (port int, err error) { return } -func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize string) (*nodeServer, error) { +func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize string) (*NodeServer, error) { err := unmountOldVols() if err != nil { klog.Warningf("There was an error when trying to unmount old volumes: %v", err) @@ -108,7 +108,7 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st // Use kubelet plugin directory for state persistence stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json" - ns := &nodeServer{ + ns := &NodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver), mounter: &mount.SafeFormatAndMount{ Interface: mount.New(""), @@ -136,7 +136,7 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st return ns, nil } -func (ns *nodeServer) Metrics() []metrics.Observable { +func (ns *NodeServer) Metrics() []metrics.Observable { var meters []metrics.Observable // What should we meter? @@ -157,16 +157,16 @@ type MountedVolume struct { } // Mounting Volume (Preparation) -func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { +func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not implemented") } -func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { +func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not implemented") } // Mounting Volume (Actual Mounting) -func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { +func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { if err := validatePublishVolumeRequest(req); err != nil { return nil, err } @@ -424,7 +424,7 @@ func extractConfigData(parameters map[string]string) (string, map[string]string) } // Unmounting Volumes -func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { +func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { klog.Infof("NodeUnpublishVolume called with: %s", req) if err := validateUnPublishVolumeRequest(req); err != nil { return nil, err @@ -465,12 +465,12 @@ func validateUnPublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error { } // Resizing Volume -func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { +func (*NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method NodeExpandVolume not implemented") } // Track mounted volume for automatic remounting -func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) { +func (ns *NodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) { ns.mutex.Lock() defer ns.mutex.Unlock() @@ -493,7 +493,7 @@ func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePat } // Remove tracked volume when unmounted -func (ns *nodeServer) removeTrackedVolume(volumeId string) { +func (ns *NodeServer) removeTrackedVolume(volumeId string) { ns.mutex.Lock() defer ns.mutex.Unlock() @@ -506,7 +506,7 @@ func (ns *nodeServer) removeTrackedVolume(volumeId string) { } // Automatically remount all tracked volumes after daemon restart -func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { +func (ns *NodeServer) remountTrackedVolumes(ctx context.Context) error { type mountResult struct { volumeID string err error @@ -585,7 +585,7 @@ func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error { } } -func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error { +func (ns *NodeServer) WaitForMountAvailable(mountpoint string) error { for { select { case <-time.After(100 * time.Millisecond): From d5abd6e4a1b19b54478be208691ef59acf990b1f Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 10:56:16 +0100 Subject: [PATCH 06/11] fix: cleanup controllerserver.go warnings --- pkg/rclone/controllerserver.go | 45 +++++++++++++++++----------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/rclone/controllerserver.go b/pkg/rclone/controllerserver.go index d0a2059..c60e25f 100644 --- a/pkg/rclone/controllerserver.go +++ b/pkg/rclone/controllerserver.go @@ -3,12 +3,12 @@ package rclone import ( + "context" "sync" "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/prometheus/client_golang/prometheus" - "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" @@ -20,14 +20,14 @@ const secretAnnotationName = "csi-rclone.dev/secretName" type ControllerServer struct { *csicommon.DefaultControllerServer - active_volumes map[string]int64 - mutex sync.RWMutex + activeVolumes map[string]int64 + mutex sync.RWMutex } func NewControllerServer(csiDriver *csicommon.CSIDriver) *ControllerServer { return &ControllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(csiDriver), - active_volumes: map[string]int64{}, + activeVolumes: map[string]int64{}, mutex: sync.RWMutex{}, } } @@ -40,14 +40,14 @@ func (cs *ControllerServer) Metrics() []metrics.Observable { Help: "Number of active (Mounted) volumes.", }) meters = append(meters, - func() { meter.Set(float64(len(cs.active_volumes))) }, + func() { meter.Set(float64(len(cs.activeVolumes))) }, ) prometheus.MustRegister(meter) return meters } -func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { +func (cs *ControllerServer) ValidateVolumeCapabilities(_ context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { volId := req.GetVolumeId() if len(volId) == 0 { return nil, status.Error(codes.InvalidArgument, "ValidateVolumeCapabilities must be provided volume id") @@ -58,7 +58,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req cs.mutex.Lock() defer cs.mutex.Unlock() - if _, ok := cs.active_volumes[volId]; !ok { + if _, ok := cs.activeVolumes[volId]; !ok { return nil, status.Errorf(codes.NotFound, "Volume %s not found", volId) } return &csi.ValidateVolumeCapabilitiesResponse{ @@ -70,17 +70,17 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req }, nil } -// Attaching Volume -func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { +// ControllerPublishVolume Attaching Volume +func (cs *ControllerServer) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ControllerPublishVolume not implemented") } -// Detaching Volume -func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { +// ControllerUnpublishVolume Detaching Volume +func (cs *ControllerServer) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ControllerUnpublishVolume not implemented") } -// Provisioning Volumes +// CreateVolume Provisioning Volumes func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { klog.Infof("ControllerCreateVolume: called with args %+v", *req) volumeName := req.GetName() @@ -95,18 +95,18 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // we don't use the size as it makes no sense for rclone. but csi drivers should succeed if // called twice with the same capacity for the same volume and fail if called twice with // differing capacity, so we need to remember it - volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes()) + volSizeBytes := req.GetCapacityRange().GetRequiredBytes() cs.mutex.Lock() defer cs.mutex.Unlock() - if val, ok := cs.active_volumes[volumeName]; ok && val != volSizeBytes { + if val, ok := cs.activeVolumes[volumeName]; ok && val != volSizeBytes { return nil, status.Errorf(codes.AlreadyExists, "Volume operation already exists for volume %s", volumeName) } - cs.active_volumes[volumeName] = volSizeBytes + cs.activeVolumes[volumeName] = volSizeBytes // See https://github.com/kubernetes-csi/external-provisioner/blob/v5.1.0/pkg/controller/controller.go#L75 // on how parameters from the persistent volume are parsed // We have to pass the secret name and namespace into the context so that the node server can use them - // The external provisioner uses the secret name and namespace but it does not pass them into the request, + // The external provisioner uses the secret name and namespace, but it does not pass them into the request, // so we read the PVC here to extract them ourselves because we may need them in the node server for decoding secrets. pvcName, pvcNameFound := req.Parameters["csi.storage.k8s.io/pvc/name"] pvcNamespace, pvcNamespaceFound := req.Parameters["csi.storage.k8s.io/pvc/namespace"] @@ -139,29 +139,28 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } -// Delete Volume -func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { +func (cs *ControllerServer) DeleteVolume(_ context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { volId := req.GetVolumeId() if len(volId) == 0 { - return nil, status.Error(codes.InvalidArgument, "DeteleVolume must be provided volume id") + return nil, status.Error(codes.InvalidArgument, "DeleteVolume must be provided volume id") } cs.mutex.Lock() defer cs.mutex.Unlock() - delete(cs.active_volumes, volId) + delete(cs.activeVolumes, volId) return &csi.DeleteVolumeResponse{}, nil } -func (*ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { +func (*ControllerServer) ControllerExpandVolume(_ context.Context, _ *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ControllerExpandVolume not implemented") } -func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { +func (cs *ControllerServer) ControllerGetVolume(_ context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { return &csi.ControllerGetVolumeResponse{Volume: &csi.Volume{ VolumeId: req.VolumeId, }}, nil } -func (cs *ControllerServer) ControllerModifyVolume(ctx context.Context, req *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { +func (cs *ControllerServer) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { return &csi.ControllerModifyVolumeResponse{}, nil } From 4c3450a2dfe54a93d641e140d81bf58a9d342e99 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 13:28:43 +0100 Subject: [PATCH 07/11] fix: cleanup main.go warnings --- cmd/csi-rclone-plugin/main.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index 4c86336..cc8ef5d 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -12,6 +12,7 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/SwissDataScienceCenter/csi-rclone/pkg/rclone" "github.com/spf13/cobra" + "k8s.io/klog" ) var ( @@ -22,8 +23,15 @@ var ( meters []metrics.Observable ) +func exitOnError(err error) { + if err != nil { + klog.Error(err.Error()) + os.Exit(1) + } +} + func init() { - flag.Set("logtostderr", "true") + exitOnError(flag.Set("logtostderr", "true")) } func main() { @@ -60,7 +68,7 @@ func main() { } root.AddCommand(versionCmd) - root.ParseFlags(os.Args[1:]) + exitOnError(root.ParseFlags(os.Args[1:])) if metricsServerConfig.Enabled { // Gracefully exit the metrics background servers @@ -71,10 +79,7 @@ func main() { go metricsServer.ListenAndServe() } - if err := root.Execute(); err != nil { - fmt.Fprintf(os.Stderr, "%s", err.Error()) - os.Exit(1) - } + exitOnError(root.Execute()) os.Exit(0) } From e89cf484eb8fbd801c9be7a646ec4c543faf06a7 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 18:05:52 +0100 Subject: [PATCH 08/11] fix: refactor rclone.Driver out --- cmd/csi-rclone-plugin/main.go | 99 ++++++++++++++++++++--------------- pkg/rclone/driver.go | 79 ++++++++-------------------- pkg/rclone/nodeserver.go | 22 +++++--- test/sanity_test.go | 27 +++++++--- 4 files changed, 112 insertions(+), 115 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index cc8ef5d..3565240 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -11,6 +11,8 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/SwissDataScienceCenter/csi-rclone/pkg/rclone" + "github.com/container-storage-interface/spec/lib/go/csi" + csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" "github.com/spf13/cobra" "k8s.io/klog" ) @@ -54,8 +56,8 @@ func main() { Use: "run", Short: "Start the CSI driver.", } - nodeCommandLineParameters(runCmd) - controllerCommandLineParameters(runCmd) + exitOnError(NodeCommandLineParameters(runCmd, &meters, &nodeID, &endpoint, &cacheDir, &cacheSize)) + exitOnError(ControllerCommandLineParameters(runCmd, &meters, &nodeID, &endpoint)) root.AddCommand(runCmd) @@ -84,59 +86,70 @@ func main() { os.Exit(0) } -func handleNode() { - d := rclone.NewDriver(nodeID, endpoint) - ns, err := rclone.NewNodeServer(d.CSIDriver, cacheDir, cacheSize) - if err != nil { - panic(err) - } - meters = append(meters, ns.Metrics()...) - d.WithNodeServer(ns) - err = d.Run() - if err != nil { - panic(err) - } -} - -func nodeCommandLineParameters(runCmd *cobra.Command) { +func NodeCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observable, nodeID, endpoint, cacheDir, cacheSize *string) error { runNode := &cobra.Command{ Use: "node", Short: "Start the CSI driver node service - expected to run in a daemonset on every node.", - Run: func(cmd *cobra.Command, args []string) { - handleNode() + RunE: func(cmd *cobra.Command, args []string) error { + //Pointers are passed by value, so we use a pointer to a pointer to be able to retrieve the allocated server in the + //run closure + var nsDoublePointer **rclone.NodeServer + return rclone.Run(context.Background(), nodeID, endpoint, + func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { + ns, err := rclone.NewNodeServer(csiDriver, *cacheDir, *cacheSize) + if err != nil { + return nil, nil, err + } + *meters = append(*meters, ns.Metrics()...) + *nsDoublePointer = ns + return nil, ns, nil + }, + func(ctx context.Context) error { + return (*nsDoublePointer).Run(ctx) + }, + ) }, } - runNode.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id") - runNode.MarkPersistentFlagRequired("nodeid") - runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") - runNode.MarkPersistentFlagRequired("endpoint") - runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir") - runNode.PersistentFlags().StringVar(&cacheSize, "cachesize", "", "cache size") - runCmd.AddCommand(runNode) -} - -func handleController() { - d := rclone.NewDriver(nodeID, endpoint) - cs := rclone.NewControllerServer(d.CSIDriver) - meters = append(meters, cs.Metrics()...) - d.WithControllerServer(cs) - err := d.Run() - if err != nil { - panic(err) + runNode.PersistentFlags().StringVar(nodeID, "nodeid", "", "node id") + if err := runNode.MarkPersistentFlagRequired("nodeid"); err != nil { + return err } + runNode.PersistentFlags().StringVar(endpoint, "endpoint", "", "CSI endpoint") + if err := runNode.MarkPersistentFlagRequired("endpoint"); err != nil { + return err + } + runNode.PersistentFlags().StringVar(cacheDir, "cachedir", "", "cache dir") + runNode.PersistentFlags().StringVar(cacheSize, "cachesize", "", "cache size") + runCmd.AddCommand(runNode) + return nil } -func controllerCommandLineParameters(runCmd *cobra.Command) { +func ControllerCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observable, nodeID, endpoint *string) error { runController := &cobra.Command{ Use: "controller", Short: "Start the CSI driver controller.", - Run: func(cmd *cobra.Command, args []string) { - handleController() + RunE: func(cmd *cobra.Command, args []string) error { + return rclone.Run(context.Background(), + nodeID, + endpoint, + func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { + cs := rclone.NewControllerServer(csiDriver) + *meters = append(*meters, cs.Metrics()...) + return cs, nil, nil + }, + func(_ context.Context) error { return nil }, + ) }, } - runController.PersistentFlags().StringVar(&nodeID, "nodeid", "", "node id") - runController.MarkPersistentFlagRequired("nodeid") - runController.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") - runController.MarkPersistentFlagRequired("endpoint") + runController.PersistentFlags().StringVar(nodeID, "nodeid", "", "node id") + if err := runController.MarkPersistentFlagRequired("nodeid"); err != nil { + return err + } + runController.PersistentFlags().StringVar(endpoint, "endpoint", "", "CSI endpoint") + if err := runController.MarkPersistentFlagRequired("endpoint"); err != nil { + return err + } + runCmd.AddCommand(runController) + return nil } diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index 4110d98..23bd58f 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -2,6 +2,7 @@ package rclone import ( "context" + "errors" "os" "github.com/container-storage-interface/spec/lib/go/csi" @@ -9,82 +10,44 @@ import ( "k8s.io/klog" ) -type Driver struct { - CSIDriver *csicommon.CSIDriver - endpoint string - - ns *NodeServer - cs *ControllerServer - cap []*csi.VolumeCapability_AccessMode - cscap []*csi.ControllerServiceCapability - server csicommon.NonBlockingGRPCServer -} - var ( DriverVersion = "SwissDataScienceCenter" ) -func NewDriver(nodeID, endpoint string) *Driver { +type DriverSetup func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) + +type DriverServe func(ctx context.Context) error + +func Run(ctx context.Context, nodeID, endpoint *string, setup DriverSetup, serve DriverServe) error { driverName := os.Getenv("DRIVER_NAME") if driverName == "" { - panic("DriverName env var not set!") + return errors.New("DRIVER_NAME env variable not set or empty") } klog.Infof("Starting new %s RcloneDriver in version %s", driverName, DriverVersion) - d := &Driver{} - d.endpoint = endpoint - - d.CSIDriver = csicommon.NewCSIDriver(driverName, DriverVersion, nodeID) - d.CSIDriver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ + driver := csicommon.NewCSIDriver(driverName, DriverVersion, *nodeID) + driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, }) - d.CSIDriver.AddControllerServiceCapabilities( + driver.AddControllerServiceCapabilities( []csi.ControllerServiceCapability_RPC_Type{ csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, }) - return d -} - -func (d *Driver) WithNodeServer(ns *NodeServer) *Driver { - d.ns = ns - return d -} - -func (d *Driver) WithControllerServer(cs *ControllerServer) *Driver { - d.cs = cs - return d -} + is := csicommon.NewDefaultIdentityServer(driver) + cs, ns, setupErr := setup(driver) + if setupErr != nil { + return setupErr + } -func (d *Driver) Run() error { s := csicommon.NewNonBlockingGRPCServer() - s.Start( - d.endpoint, - csicommon.NewDefaultIdentityServer(d.CSIDriver), - d.cs, - d.ns, - ) - d.server = s - if d.ns != nil && d.ns.RcloneOps != nil { - onDaemonReady := func() error { - if d.ns != nil { - return d.ns.remountTrackedVolumes(context.Background()) - } - return nil - } - return d.ns.RcloneOps.Run(onDaemonReady) + defer s.Stop() + s.Start(*endpoint, is, cs, ns) + + if err := serve(ctx); err != nil { + return err } + s.Wait() return nil } - -func (d *Driver) Stop() error { - var err error - if d.ns != nil && d.ns.RcloneOps != nil { - err = d.ns.RcloneOps.Cleanup() - } - if d.server != nil { - d.server.Stop() - } - return err -} diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index b686082..56ff32d 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -103,10 +103,6 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st if err != nil { return nil, fmt.Errorf("Cannot get a free TCP port to run rclone") } - rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize) - - // Use kubelet plugin directory for state persistence - stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json" ns := &NodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver), @@ -114,10 +110,11 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st Interface: mount.New(""), Exec: exec.New(), }, - RcloneOps: rcloneOps, + RcloneOps: NewRclone(kubeClient, rclonePort, cacheDir, cacheSize), mountedVolumes: make(map[string]MountedVolume), mutex: &sync.Mutex{}, - stateFile: stateFile, + // Use kubelet plugin directory for state persistence + stateFile: "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json", } // Ensure the folder exists @@ -136,6 +133,13 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st return ns, nil } +func (ns *NodeServer) Run(ctx context.Context) error { + defer ns.Stop() + return ns.RcloneOps.Run(func() error { + return ns.remountTrackedVolumes(ctx) + }) +} + func (ns *NodeServer) Metrics() []metrics.Observable { var meters []metrics.Observable @@ -144,6 +148,12 @@ func (ns *NodeServer) Metrics() []metrics.Observable { return meters } +func (ns *NodeServer) Stop() { + if err := ns.RcloneOps.Cleanup(); err != nil { + klog.Errorf("Failed to cleanup rclone ops: %v", err) + } +} + type MountedVolume struct { VolumeId string TargetPath string diff --git a/test/sanity_test.go b/test/sanity_test.go index 0527cd4..6e400d9 100644 --- a/test/sanity_test.go +++ b/test/sanity_test.go @@ -9,9 +9,11 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/kube" "github.com/SwissDataScienceCenter/csi-rclone/pkg/rclone" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/google/uuid" "github.com/kubernetes-csi/csi-test/v5/pkg/sanity" "github.com/kubernetes-csi/csi-test/v5/utils" + csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "google.golang.org/grpc" @@ -45,25 +47,35 @@ func createSocketDir() (string, error) { var _ = Describe("Sanity CSI checks", Ordered, func() { var err error var kubeClient *kubernetes.Clientset = &kubernetes.Clientset{} + var nodeID string var endpoint string - var driver *rclone.Driver = &rclone.Driver{} var socketDir string BeforeAll(func() { socketDir, err = createSocketDir() Expect(err).ShouldNot(HaveOccurred()) + nodeID = "hostname" endpoint = fmt.Sprintf("unix://%s/csi.sock", socketDir) kubeClient, err = kube.GetK8sClient() Expect(err).ShouldNot(HaveOccurred()) os.Setenv("DRIVER_NAME", "csi-rclone") - driver = rclone.NewDriver("hostname", endpoint) - cs := rclone.NewControllerServer(driver.CSIDriver) - ns, err := rclone.NewNodeServer(driver.CSIDriver, "", "") - Expect(err).ShouldNot(HaveOccurred()) - driver.WithControllerServer(cs).WithNodeServer(ns) go func() { defer GinkgoRecover() - err := driver.Run() + var nsDoublePointer **rclone.NodeServer + err := rclone.Run(context.Background(), &nodeID, &endpoint, + func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { + cs := rclone.NewControllerServer(csiDriver) + ns, err := rclone.NewNodeServer(csiDriver, "", "") + if err != nil { + return nil, nil, err + } + nsDoublePointer = &ns + return cs, ns, nil + }, + func(ctx context.Context) error { + return (*nsDoublePointer).Run(ctx) + }, + ) Expect(err).ShouldNot(HaveOccurred()) }() _, err = utils.Connect(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -71,7 +83,6 @@ var _ = Describe("Sanity CSI checks", Ordered, func() { }) AfterAll(func() { - driver.Stop() os.RemoveAll(socketDir) os.Unsetenv("DRIVER_NAME") }) From 6461aad3de9bfd07ebe7c4ad1dbeed5986c247e7 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 18:09:12 +0100 Subject: [PATCH 09/11] fix: regroup ControllerServer functions --- cmd/csi-rclone-plugin/main.go | 32 +------------------------------- pkg/rclone/controllerserver.go | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index 3565240..7c5f3dd 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -57,7 +57,7 @@ func main() { Short: "Start the CSI driver.", } exitOnError(NodeCommandLineParameters(runCmd, &meters, &nodeID, &endpoint, &cacheDir, &cacheSize)) - exitOnError(ControllerCommandLineParameters(runCmd, &meters, &nodeID, &endpoint)) + exitOnError(rclone.ControllerCommandLineParameters(runCmd, &meters, &nodeID, &endpoint)) root.AddCommand(runCmd) @@ -123,33 +123,3 @@ func NodeCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observab runCmd.AddCommand(runNode) return nil } - -func ControllerCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observable, nodeID, endpoint *string) error { - runController := &cobra.Command{ - Use: "controller", - Short: "Start the CSI driver controller.", - RunE: func(cmd *cobra.Command, args []string) error { - return rclone.Run(context.Background(), - nodeID, - endpoint, - func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { - cs := rclone.NewControllerServer(csiDriver) - *meters = append(*meters, cs.Metrics()...) - return cs, nil, nil - }, - func(_ context.Context) error { return nil }, - ) - }, - } - runController.PersistentFlags().StringVar(nodeID, "nodeid", "", "node id") - if err := runController.MarkPersistentFlagRequired("nodeid"); err != nil { - return err - } - runController.PersistentFlags().StringVar(endpoint, "endpoint", "", "CSI endpoint") - if err := runController.MarkPersistentFlagRequired("endpoint"); err != nil { - return err - } - - runCmd.AddCommand(runController) - return nil -} diff --git a/pkg/rclone/controllerserver.go b/pkg/rclone/controllerserver.go index c60e25f..9b633cf 100644 --- a/pkg/rclone/controllerserver.go +++ b/pkg/rclone/controllerserver.go @@ -9,6 +9,7 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" @@ -32,7 +33,7 @@ func NewControllerServer(csiDriver *csicommon.CSIDriver) *ControllerServer { } } -func (cs *ControllerServer) Metrics() []metrics.Observable { +func (cs *ControllerServer) metrics() []metrics.Observable { var meters []metrics.Observable meter := prometheus.NewGauge(prometheus.GaugeOpts{ @@ -47,6 +48,36 @@ func (cs *ControllerServer) Metrics() []metrics.Observable { return meters } +func ControllerCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observable, nodeID, endpoint *string) error { + runController := &cobra.Command{ + Use: "controller", + Short: "Start the CSI driver controller.", + RunE: func(cmd *cobra.Command, args []string) error { + return Run(context.Background(), + nodeID, + endpoint, + func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { + cs := NewControllerServer(csiDriver) + *meters = append(*meters, cs.metrics()...) + return cs, nil, nil + }, + func(_ context.Context) error { return nil }, + ) + }, + } + runController.PersistentFlags().StringVar(nodeID, "nodeid", "", "node id") + if err := runController.MarkPersistentFlagRequired("nodeid"); err != nil { + return err + } + runController.PersistentFlags().StringVar(endpoint, "endpoint", "", "CSI endpoint") + if err := runController.MarkPersistentFlagRequired("endpoint"); err != nil { + return err + } + + runCmd.AddCommand(runController) + return nil +} + func (cs *ControllerServer) ValidateVolumeCapabilities(_ context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { volId := req.GetVolumeId() if len(volId) == 0 { From 7a9423ddc9102c0a5d9f322b0e7cce32b007c06b Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Fri, 28 Nov 2025 18:13:13 +0100 Subject: [PATCH 10/11] fix: regroup NodeServer functions --- cmd/csi-rclone-plugin/main.go | 42 +---------------------------------- pkg/rclone/nodeserver.go | 41 +++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index 7c5f3dd..bdfc39c 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -11,8 +11,6 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/SwissDataScienceCenter/csi-rclone/pkg/rclone" - "github.com/container-storage-interface/spec/lib/go/csi" - csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common" "github.com/spf13/cobra" "k8s.io/klog" ) @@ -56,7 +54,7 @@ func main() { Use: "run", Short: "Start the CSI driver.", } - exitOnError(NodeCommandLineParameters(runCmd, &meters, &nodeID, &endpoint, &cacheDir, &cacheSize)) + exitOnError(rclone.NodeCommandLineParameters(runCmd, &meters, &nodeID, &endpoint, &cacheDir, &cacheSize)) exitOnError(rclone.ControllerCommandLineParameters(runCmd, &meters, &nodeID, &endpoint)) root.AddCommand(runCmd) @@ -85,41 +83,3 @@ func main() { os.Exit(0) } - -func NodeCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observable, nodeID, endpoint, cacheDir, cacheSize *string) error { - runNode := &cobra.Command{ - Use: "node", - Short: "Start the CSI driver node service - expected to run in a daemonset on every node.", - RunE: func(cmd *cobra.Command, args []string) error { - //Pointers are passed by value, so we use a pointer to a pointer to be able to retrieve the allocated server in the - //run closure - var nsDoublePointer **rclone.NodeServer - return rclone.Run(context.Background(), nodeID, endpoint, - func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { - ns, err := rclone.NewNodeServer(csiDriver, *cacheDir, *cacheSize) - if err != nil { - return nil, nil, err - } - *meters = append(*meters, ns.Metrics()...) - *nsDoublePointer = ns - return nil, ns, nil - }, - func(ctx context.Context) error { - return (*nsDoublePointer).Run(ctx) - }, - ) - }, - } - runNode.PersistentFlags().StringVar(nodeID, "nodeid", "", "node id") - if err := runNode.MarkPersistentFlagRequired("nodeid"); err != nil { - return err - } - runNode.PersistentFlags().StringVar(endpoint, "endpoint", "", "CSI endpoint") - if err := runNode.MarkPersistentFlagRequired("endpoint"); err != nil { - return err - } - runNode.PersistentFlags().StringVar(cacheDir, "cachedir", "", "cache dir") - runNode.PersistentFlags().StringVar(cacheSize, "cachesize", "", "cache size") - runCmd.AddCommand(runNode) - return nil -} diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index 56ff32d..18a3f16 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -22,6 +22,7 @@ import ( "github.com/SwissDataScienceCenter/csi-rclone/pkg/metrics" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/fernet/fernet-go" + "github.com/spf13/cobra" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -140,7 +141,7 @@ func (ns *NodeServer) Run(ctx context.Context) error { }) } -func (ns *NodeServer) Metrics() []metrics.Observable { +func (ns *NodeServer) metrics() []metrics.Observable { var meters []metrics.Observable // What should we meter? @@ -154,6 +155,44 @@ func (ns *NodeServer) Stop() { } } +func NodeCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observable, nodeID, endpoint, cacheDir, cacheSize *string) error { + runNode := &cobra.Command{ + Use: "node", + Short: "Start the CSI driver node service - expected to run in a daemonset on every node.", + RunE: func(cmd *cobra.Command, args []string) error { + //Pointers are passed by value, so we use a pointer to a pointer to be able to retrieve the allocated server in the + //run closure + var nsDoublePointer **NodeServer + return Run(context.Background(), nodeID, endpoint, + func(csiDriver *csicommon.CSIDriver) (csi.ControllerServer, csi.NodeServer, error) { + ns, err := NewNodeServer(csiDriver, *cacheDir, *cacheSize) + if err != nil { + return nil, nil, err + } + *meters = append(*meters, ns.metrics()...) + *nsDoublePointer = ns + return nil, ns, nil + }, + func(ctx context.Context) error { + return (*nsDoublePointer).Run(ctx) + }, + ) + }, + } + runNode.PersistentFlags().StringVar(nodeID, "nodeid", "", "node id") + if err := runNode.MarkPersistentFlagRequired("nodeid"); err != nil { + return err + } + runNode.PersistentFlags().StringVar(endpoint, "endpoint", "", "CSI endpoint") + if err := runNode.MarkPersistentFlagRequired("endpoint"); err != nil { + return err + } + runNode.PersistentFlags().StringVar(cacheDir, "cachedir", "", "cache dir") + runNode.PersistentFlags().StringVar(cacheSize, "cachesize", "", "cache size") + runCmd.AddCommand(runNode) + return nil +} + type MountedVolume struct { VolumeId string TargetPath string From 290b2e06eb05fccd33a5ccdca1d7f61dbb0f5967 Mon Sep 17 00:00:00 2001 From: Lionel Sambuc Date: Mon, 1 Dec 2025 08:52:21 +0100 Subject: [PATCH 11/11] fix: Use tags to specify json field names on permanent storage --- pkg/rclone/nodeserver.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/rclone/nodeserver.go b/pkg/rclone/nodeserver.go index 18a3f16..7dc241e 100644 --- a/pkg/rclone/nodeserver.go +++ b/pkg/rclone/nodeserver.go @@ -194,15 +194,15 @@ func NodeCommandLineParameters(runCmd *cobra.Command, meters *[]metrics.Observab } type MountedVolume struct { - VolumeId string - TargetPath string - Remote string - RemotePath string - ConfigData string - ReadOnly bool - Parameters map[string]string - SecretName string - SecretNamespace string + VolumeId string `json:"volume_id"` + TargetPath string `json:"target_path"` + Remote string `json:"remote"` + RemotePath string `json:"remote_path"` + ConfigData string `json:"config_data"` + ReadOnly bool `json:"read_only"` + Parameters map[string]string `json:"parameters"` + SecretName string `json:"secret_name"` + SecretNamespace string `json:"secret_namespace"` } // Mounting Volume (Preparation)