From a1339faf29fe22ed101fbdb21e03746d05798182 Mon Sep 17 00:00:00 2001 From: Flora Thiebaut Date: Wed, 9 Apr 2025 10:49:28 +0200 Subject: [PATCH 1/2] feat: add support for mount options --- pkg/rclone/rclone.go | 99 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 16 deletions(-) diff --git a/pkg/rclone/rclone.go b/pkg/rclone/rclone.go index 6286e5ed..4cb2a8d5 100644 --- a/pkg/rclone/rclone.go +++ b/pkg/rclone/rclone.go @@ -57,15 +57,62 @@ type MountRequest struct { MountOpt MountOpt `json:"mountOpt"` } +// VfsOpt is options for creating the vfs +// +// Note that the `Daemon` option has been removed as it is not accepted for rc calls. type VfsOpt struct { - CacheMode string `json:"cacheMode"` - DirCacheTime time.Duration `json:"dirCacheTime"` - ReadOnly bool `json:"readOnly"` + NoSeek bool `json:",omitempty"` // don't allow seeking if set + NoChecksum bool `json:",omitempty"` // don't check checksums if set + ReadOnly bool `json:",omitempty"` // if set VFS is read only + NoModTime bool `json:",omitempty"` // don't read mod times for files + DirCacheTime time.Duration `json:",omitempty"` // how long to consider directory listing cache valid + Refresh bool `json:",omitempty"` // refreshes the directory listing recursively on start + PollInterval time.Duration `json:",omitempty"` + Umask int `json:",omitempty"` + UID uint32 `json:",omitempty"` + GID uint32 `json:",omitempty"` + DirPerms os.FileMode `json:",omitempty"` + FilePerms os.FileMode `json:",omitempty"` + ChunkSize int64 `json:",omitempty"` // if > 0 read files in chunks + ChunkSizeLimit int64 `json:",omitempty"` // if > ChunkSize double the chunk size after each chunk until reached + CacheMode string `json:",omitempty"` + CacheMaxAge time.Duration `json:",omitempty"` + CacheMaxSize int64 `json:",omitempty"` + CacheMinFreeSpace int64 `json:",omitempty"` + CachePollInterval time.Duration `json:",omitempty"` + CaseInsensitive bool `json:",omitempty"` + WriteWait time.Duration `json:",omitempty"` // time to wait for in-sequence write + ReadWait time.Duration `json:",omitempty"` // time to wait for in-sequence read + WriteBack time.Duration `json:",omitempty"` // time to wait before writing back dirty files + ReadAhead int64 `json:",omitempty"` // bytes to read ahead in cache mode "full" + UsedIsSize bool `json:",omitempty"` // if true, use the `rclone size` algorithm for Used size + FastFingerprint bool `json:",omitempty"` // if set use fast fingerprints + DiskSpaceTotalSize int64 `json:",omitempty"` } + +// Options for creating the mount +// +// Note that options not supported on Linux have been removed. type MountOpt struct { - AllowNonEmpty bool `json:"allowNonEmpty"` - AllowOther bool `json:"allowOther"` + DebugFUSE bool `json:",omitempty"` + AllowNonEmpty bool `json:",omitempty"` + AllowRoot bool `json:",omitempty"` + AllowOther bool `json:",omitempty"` + DefaultPermissions bool `json:",omitempty"` + WritebackCache bool `json:",omitempty"` + DaemonWait time.Duration `json:",omitempty"` // time to wait for ready mount from daemon, maximum on Linux or constant on macOS/BSD + MaxReadAhead int64 `json:",omitempty"` + ExtraOptions []string `json:",omitempty"` + ExtraFlags []string `json:",omitempty"` + AttrTimeout time.Duration `json:",omitempty"` // how long the kernel caches attribute for + DeviceName string `json:",omitempty"` + VolumeName string `json:",omitempty"` + NoAppleDouble bool `json:",omitempty"` + NoAppleXattr bool `json:",omitempty"` + AsyncRead bool `json:",omitempty"` + CaseInsensitive string `json:",omitempty"` } + type ConfigCreateRequest struct { Name string `json:"name"` Parameters map[string]string `json:"parameters"` @@ -105,7 +152,7 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa Parameters: params, Opt: map[string]interface{}{"obscure": true}, } - klog.Infof("executing create config command args=%v, targetpath=%s", configName, targetPath) + klog.Infof("executing create config command name=%s, storageType=%s", configName, configOpts.StorageType) postBody, err := json.Marshal(configOpts) if err != nil { return fmt.Errorf("mounting failed: couldn't create request body: %s", err) @@ -121,19 +168,39 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa } klog.Infof("created config: %s", configName) + // VFS Mount parameters + vfsOpt := VfsOpt{ + CacheMode: "writes", + DirCacheTime: 60 * time.Second, + } + vfsOptStr := parameters["vfsOpt"] + if vfsOptStr != "" { + err = json.Unmarshal([]byte(vfsOptStr), &vfsOpt) + if err != nil { + return fmt.Errorf("could not parse vfsOpt: %w", err) + } + } + // The `ReadOnly` option is specified in the PVC + vfsOpt.ReadOnly = readOnly + // Mount parameters + mountOpt := MountOpt{ + AllowNonEmpty: true, + AllowOther: true, + } + mountOptStr := parameters["mountOpt"] + if mountOptStr != "" { + err = json.Unmarshal([]byte(mountOptStr), &mountOpt) + if err != nil { + return fmt.Errorf("could not parse mountOpt: %w", err) + } + } + remoteWithPath := fmt.Sprintf("%s:%s", configName, rcloneVolume.RemotePath) mountArgs := MountRequest{ Fs: remoteWithPath, MountPoint: targetPath, - VfsOpt: VfsOpt{ - CacheMode: "writes", - DirCacheTime: 60 * time.Second, - ReadOnly: readOnly, - }, - MountOpt: MountOpt{ - AllowNonEmpty: true, - AllowOther: true, - }, + VfsOpt: vfsOpt, + MountOpt: mountOpt, } // create target, os.Mkdirall is noop if it exists @@ -141,11 +208,11 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa if err != nil { return err } - klog.Infof("executing mount command args=%v, targetpath=%s", mountArgs, targetPath) postBody, err = json.Marshal(mountArgs) if err != nil { return fmt.Errorf("mounting failed: couldn't create request body: %s", err) } + klog.Infof("executing mount command args=%s", string(postBody)) requestBody = bytes.NewBuffer(postBody) resp, err = http.Post(fmt.Sprintf("http://localhost:%d/mount/mount", r.port), "application/json", requestBody) if err != nil { From f07c61669169e5e72dfb517da74367d65169b2d9 Mon Sep 17 00:00:00 2001 From: Flora Thiebaut Date: Wed, 9 Apr 2025 13:20:09 +0200 Subject: [PATCH 2/2] feat: configure cache dir for rclone --- cmd/csi-rclone-plugin/main.go | 4 +++- deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml | 8 ++++++++ deploy/csi-rclone/values.yaml | 3 +++ pkg/rclone/driver.go | 4 ++-- pkg/rclone/rclone.go | 9 +++++++-- 5 files changed, 23 insertions(+), 5 deletions(-) diff --git a/cmd/csi-rclone-plugin/main.go b/cmd/csi-rclone-plugin/main.go index e6df222e..0952469a 100644 --- a/cmd/csi-rclone-plugin/main.go +++ b/cmd/csi-rclone-plugin/main.go @@ -15,6 +15,7 @@ import ( var ( endpoint string nodeID string + cacheDir string ) func init() { @@ -45,6 +46,7 @@ func main() { runNode.MarkPersistentFlagRequired("nodeid") runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint") runNode.MarkPersistentFlagRequired("endpoint") + runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir") runCmd.AddCommand(runNode) runController := &cobra.Command{ Use: "controller", @@ -83,7 +85,7 @@ func handleNode() { klog.Warningf("There was an error when trying to unmount old volumes: %v", err) } d := rclone.NewDriver(nodeID, endpoint) - ns, err := rclone.NewNodeServer(d.CSIDriver) + ns, err := rclone.NewNodeServer(d.CSIDriver, cacheDir) if err != nil { panic(err) } diff --git a/deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml b/deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml index 5ce0ad65..8c0ea7d7 100644 --- a/deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml +++ b/deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml @@ -63,6 +63,7 @@ spec: - node - --nodeid=$(NODE_ID) - --endpoint=$(CSI_ENDPOINT) + - --cachedir=$(CACHE_DIR) env: - name: NODE_ID valueFrom: @@ -76,6 +77,8 @@ spec: value: {{ .Values.storageClassName | quote}} - name: LOG_LEVEL value: {{ .Values.logLevel | default "NOTICE" | quote }} + - name: CACHE_DIR + value: {{ .Values.csiNodepluginRclone.rclone.cache.dir | quote }} image: {{ .Values.csiNodepluginRclone.rclone.image.repository }}:{{ .Values.csiNodepluginRclone.rclone.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.csiNodepluginRclone.rclone.imagePullPolicy }} resources: @@ -100,6 +103,8 @@ spec: - mountPath: /var/lib/kubelet/pods mountPropagation: Bidirectional name: pods-mount-dir + - mountPath: /var/lib/rclone + name: cache-dir {{- with .Values.csiNodepluginRclone.nodeSelector }} nodeSelector: {{ toYaml . | nindent 8 }} @@ -125,3 +130,6 @@ spec: path: {{ .Values.kubeletDir }}/plugins_registry type: DirectoryOrCreate name: registration-dir + - name: cache-dir + emptyDir: + sizeLimit: 1G diff --git a/deploy/csi-rclone/values.yaml b/deploy/csi-rclone/values.yaml index 01be2183..93f7a33f 100644 --- a/deploy/csi-rclone/values.yaml +++ b/deploy/csi-rclone/values.yaml @@ -52,6 +52,9 @@ csiNodepluginRclone: # requests: # cpu: 100m # memory: 128M + cache: + # The location of the cache directory + dir: /var/lib/rclone/cache serviceAccount: annotations: {} nodeSelector: {} diff --git a/pkg/rclone/driver.go b/pkg/rclone/driver.go index a5e7a4f7..189077fc 100644 --- a/pkg/rclone/driver.go +++ b/pkg/rclone/driver.go @@ -64,7 +64,7 @@ func NewDriver(nodeID, endpoint string) *Driver { return d } -func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) { +func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string) (*nodeServer, error) { kubeClient, err := kube.GetK8sClient() if err != nil { return nil, err @@ -74,7 +74,7 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) { if err != nil { return nil, fmt.Errorf("Cannot get a free TCP port to run rclone") } - rcloneOps := NewRclone(kubeClient, rclonePort) + rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir) return &nodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver), diff --git a/pkg/rclone/rclone.go b/pkg/rclone/rclone.go index 4cb2a8d5..d3edcda0 100644 --- a/pkg/rclone/rclone.go +++ b/pkg/rclone/rclone.go @@ -43,6 +43,7 @@ type Rclone struct { kubeClient *kubernetes.Clientset daemonCmd *os_exec.Cmd port int + cacheDir string } type RcloneVolume struct { @@ -349,11 +350,12 @@ func (r Rclone) GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolu return nil, ErrVolumeNotFound } -func NewRclone(kubeClient *kubernetes.Clientset, port int) Operations { +func NewRclone(kubeClient *kubernetes.Clientset, port int, cacheDir string) Operations { rclone := &Rclone{ execute: exec.New(), kubeClient: kubeClient, port: port, + cacheDir: cacheDir, } return rclone } @@ -415,13 +417,16 @@ func (r *Rclone) start_daemon() error { rclone_args = append(rclone_args, "--cache-info-age=72h") rclone_args = append(rclone_args, "--cache-chunk-clean-interval=15m") rclone_args = append(rclone_args, "--rc-no-auth") + if r.cacheDir != "" { + rclone_args = append(rclone_args, fmt.Sprintf("--cache-dir=%s", r.cacheDir)) + } loglevel := os.Getenv("LOG_LEVEL") if len(loglevel) == 0 { loglevel = "NOTICE" } rclone_args = append(rclone_args, fmt.Sprintf("--log-level=%s", loglevel)) rclone_args = append(rclone_args, fmt.Sprintf("--config=%s", f.Name())) - klog.Infof("running rclone remote control daemon cmd=%s, args=%s, ", rclone_cmd, rclone_args) + klog.Infof("running rclone remote control daemon cmd=%s, args=%s", rclone_cmd, rclone_args) env := os.Environ() cmd := os_exec.Command(rclone_cmd, rclone_args...)