Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/csi-rclone-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
var (
endpoint string
nodeID string
cacheDir string
)

func init() {
Expand Down Expand Up @@ -45,6 +46,7 @@ func main() {
runNode.MarkPersistentFlagRequired("nodeid")
runNode.PersistentFlags().StringVar(&endpoint, "endpoint", "", "CSI endpoint")
runNode.MarkPersistentFlagRequired("endpoint")
runNode.PersistentFlags().StringVar(&cacheDir, "cachedir", "", "cache dir")
runCmd.AddCommand(runNode)
runController := &cobra.Command{
Use: "controller",
Expand Down Expand Up @@ -83,7 +85,7 @@ func handleNode() {
klog.Warningf("There was an error when trying to unmount old volumes: %v", err)
}
d := rclone.NewDriver(nodeID, endpoint)
ns, err := rclone.NewNodeServer(d.CSIDriver)
ns, err := rclone.NewNodeServer(d.CSIDriver, cacheDir)
if err != nil {
panic(err)
}
Expand Down
8 changes: 8 additions & 0 deletions deploy/csi-rclone/templates/csi-nodeplugin-rclone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ spec:
- node
- --nodeid=$(NODE_ID)
- --endpoint=$(CSI_ENDPOINT)
- --cachedir=$(CACHE_DIR)
env:
- name: NODE_ID
valueFrom:
Expand All @@ -76,6 +77,8 @@ spec:
value: {{ .Values.storageClassName | quote}}
- name: LOG_LEVEL
value: {{ .Values.logLevel | default "NOTICE" | quote }}
- name: CACHE_DIR
value: {{ .Values.csiNodepluginRclone.rclone.cache.dir | quote }}
image: {{ .Values.csiNodepluginRclone.rclone.image.repository }}:{{ .Values.csiNodepluginRclone.rclone.image.tag | default .Chart.AppVersion }}
imagePullPolicy: {{ .Values.csiNodepluginRclone.rclone.imagePullPolicy }}
resources:
Expand All @@ -100,6 +103,8 @@ spec:
- mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
name: pods-mount-dir
- mountPath: /var/lib/rclone
name: cache-dir
{{- with .Values.csiNodepluginRclone.nodeSelector }}
nodeSelector:
{{ toYaml . | nindent 8 }}
Expand All @@ -125,3 +130,6 @@ spec:
path: {{ .Values.kubeletDir }}/plugins_registry
type: DirectoryOrCreate
name: registration-dir
- name: cache-dir
emptyDir:
sizeLimit: 1G
3 changes: 3 additions & 0 deletions deploy/csi-rclone/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ csiNodepluginRclone:
# requests:
# cpu: 100m
# memory: 128M
cache:
# The location of the cache directory
dir: /var/lib/rclone/cache
serviceAccount:
annotations: {}
nodeSelector: {}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rclone/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewDriver(nodeID, endpoint string) *Driver {
return d
}

func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) {
func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string) (*nodeServer, error) {
kubeClient, err := kube.GetK8sClient()
if err != nil {
return nil, err
Expand All @@ -74,7 +74,7 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver) (*nodeServer, error) {
if err != nil {
return nil, fmt.Errorf("Cannot get a free TCP port to run rclone")
}
rcloneOps := NewRclone(kubeClient, rclonePort)
rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir)

return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver),
Expand Down
108 changes: 90 additions & 18 deletions pkg/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Rclone struct {
kubeClient *kubernetes.Clientset
daemonCmd *os_exec.Cmd
port int
cacheDir string
}

type RcloneVolume struct {
Expand All @@ -57,15 +58,62 @@ type MountRequest struct {
MountOpt MountOpt `json:"mountOpt"`
}

// VfsOpt is options for creating the vfs
//
// Note that the `Daemon` option has been removed as it is not accepted for rc calls.
type VfsOpt struct {
CacheMode string `json:"cacheMode"`
DirCacheTime time.Duration `json:"dirCacheTime"`
ReadOnly bool `json:"readOnly"`
NoSeek bool `json:",omitempty"` // don't allow seeking if set
NoChecksum bool `json:",omitempty"` // don't check checksums if set
ReadOnly bool `json:",omitempty"` // if set VFS is read only
NoModTime bool `json:",omitempty"` // don't read mod times for files
DirCacheTime time.Duration `json:",omitempty"` // how long to consider directory listing cache valid
Refresh bool `json:",omitempty"` // refreshes the directory listing recursively on start
PollInterval time.Duration `json:",omitempty"`
Umask int `json:",omitempty"`
UID uint32 `json:",omitempty"`
GID uint32 `json:",omitempty"`
DirPerms os.FileMode `json:",omitempty"`
FilePerms os.FileMode `json:",omitempty"`
ChunkSize int64 `json:",omitempty"` // if > 0 read files in chunks
ChunkSizeLimit int64 `json:",omitempty"` // if > ChunkSize double the chunk size after each chunk until reached
CacheMode string `json:",omitempty"`
CacheMaxAge time.Duration `json:",omitempty"`
CacheMaxSize int64 `json:",omitempty"`
CacheMinFreeSpace int64 `json:",omitempty"`
CachePollInterval time.Duration `json:",omitempty"`
CaseInsensitive bool `json:",omitempty"`
WriteWait time.Duration `json:",omitempty"` // time to wait for in-sequence write
ReadWait time.Duration `json:",omitempty"` // time to wait for in-sequence read
WriteBack time.Duration `json:",omitempty"` // time to wait before writing back dirty files
ReadAhead int64 `json:",omitempty"` // bytes to read ahead in cache mode "full"
UsedIsSize bool `json:",omitempty"` // if true, use the `rclone size` algorithm for Used size
FastFingerprint bool `json:",omitempty"` // if set use fast fingerprints
DiskSpaceTotalSize int64 `json:",omitempty"`
}

// Options for creating the mount
//
// Note that options not supported on Linux have been removed.
type MountOpt struct {
AllowNonEmpty bool `json:"allowNonEmpty"`
AllowOther bool `json:"allowOther"`
DebugFUSE bool `json:",omitempty"`
AllowNonEmpty bool `json:",omitempty"`
AllowRoot bool `json:",omitempty"`
AllowOther bool `json:",omitempty"`
DefaultPermissions bool `json:",omitempty"`
WritebackCache bool `json:",omitempty"`
DaemonWait time.Duration `json:",omitempty"` // time to wait for ready mount from daemon, maximum on Linux or constant on macOS/BSD
MaxReadAhead int64 `json:",omitempty"`
ExtraOptions []string `json:",omitempty"`
ExtraFlags []string `json:",omitempty"`
AttrTimeout time.Duration `json:",omitempty"` // how long the kernel caches attribute for
DeviceName string `json:",omitempty"`
VolumeName string `json:",omitempty"`
NoAppleDouble bool `json:",omitempty"`
NoAppleXattr bool `json:",omitempty"`
AsyncRead bool `json:",omitempty"`
CaseInsensitive string `json:",omitempty"`
}

type ConfigCreateRequest struct {
Name string `json:"name"`
Parameters map[string]string `json:"parameters"`
Expand Down Expand Up @@ -105,7 +153,7 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa
Parameters: params,
Opt: map[string]interface{}{"obscure": true},
}
klog.Infof("executing create config command args=%v, targetpath=%s", configName, targetPath)
klog.Infof("executing create config command name=%s, storageType=%s", configName, configOpts.StorageType)
postBody, err := json.Marshal(configOpts)
if err != nil {
return fmt.Errorf("mounting failed: couldn't create request body: %s", err)
Expand All @@ -121,31 +169,51 @@ func (r *Rclone) Mount(ctx context.Context, rcloneVolume *RcloneVolume, targetPa
}
klog.Infof("created config: %s", configName)

// VFS Mount parameters
vfsOpt := VfsOpt{
CacheMode: "writes",
DirCacheTime: 60 * time.Second,
}
vfsOptStr := parameters["vfsOpt"]
if vfsOptStr != "" {
err = json.Unmarshal([]byte(vfsOptStr), &vfsOpt)
if err != nil {
return fmt.Errorf("could not parse vfsOpt: %w", err)
}
}
// The `ReadOnly` option is specified in the PVC
vfsOpt.ReadOnly = readOnly
// Mount parameters
mountOpt := MountOpt{
AllowNonEmpty: true,
AllowOther: true,
}
mountOptStr := parameters["mountOpt"]
if mountOptStr != "" {
err = json.Unmarshal([]byte(mountOptStr), &mountOpt)
if err != nil {
return fmt.Errorf("could not parse mountOpt: %w", err)
}
}

remoteWithPath := fmt.Sprintf("%s:%s", configName, rcloneVolume.RemotePath)
mountArgs := MountRequest{
Fs: remoteWithPath,
MountPoint: targetPath,
VfsOpt: VfsOpt{
CacheMode: "writes",
DirCacheTime: 60 * time.Second,
ReadOnly: readOnly,
},
MountOpt: MountOpt{
AllowNonEmpty: true,
AllowOther: true,
},
VfsOpt: vfsOpt,
MountOpt: mountOpt,
}

// create target, os.Mkdirall is noop if it exists
err = os.MkdirAll(targetPath, 0750)
if err != nil {
return err
}
klog.Infof("executing mount command args=%v, targetpath=%s", mountArgs, targetPath)
postBody, err = json.Marshal(mountArgs)
if err != nil {
return fmt.Errorf("mounting failed: couldn't create request body: %s", err)
}
klog.Infof("executing mount command args=%s", string(postBody))
requestBody = bytes.NewBuffer(postBody)
resp, err = http.Post(fmt.Sprintf("http://localhost:%d/mount/mount", r.port), "application/json", requestBody)
if err != nil {
Expand Down Expand Up @@ -282,11 +350,12 @@ func (r Rclone) GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolu
return nil, ErrVolumeNotFound
}

func NewRclone(kubeClient *kubernetes.Clientset, port int) Operations {
func NewRclone(kubeClient *kubernetes.Clientset, port int, cacheDir string) Operations {
rclone := &Rclone{
execute: exec.New(),
kubeClient: kubeClient,
port: port,
cacheDir: cacheDir,
}
return rclone
}
Expand Down Expand Up @@ -348,13 +417,16 @@ func (r *Rclone) start_daemon() error {
rclone_args = append(rclone_args, "--cache-info-age=72h")
rclone_args = append(rclone_args, "--cache-chunk-clean-interval=15m")
rclone_args = append(rclone_args, "--rc-no-auth")
if r.cacheDir != "" {
rclone_args = append(rclone_args, fmt.Sprintf("--cache-dir=%s", r.cacheDir))
}
loglevel := os.Getenv("LOG_LEVEL")
if len(loglevel) == 0 {
loglevel = "NOTICE"
}
rclone_args = append(rclone_args, fmt.Sprintf("--log-level=%s", loglevel))
rclone_args = append(rclone_args, fmt.Sprintf("--config=%s", f.Name()))
klog.Infof("running rclone remote control daemon cmd=%s, args=%s, ", rclone_cmd, rclone_args)
klog.Infof("running rclone remote control daemon cmd=%s, args=%s", rclone_cmd, rclone_args)

env := os.Environ()
cmd := os_exec.Command(rclone_cmd, rclone_args...)
Expand Down
Loading