diff --git a/pkg/mounter/cmd_mounter.go b/pkg/mounter/cmd_mounter.go index b603b651e..3f8fb1eef 100644 --- a/pkg/mounter/cmd_mounter.go +++ b/pkg/mounter/cmd_mounter.go @@ -7,41 +7,32 @@ import ( "os/exec" "time" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/mount-utils" ) const timeout = time.Second * 10 type OssCmdMounter struct { - mount.Interface - volumeId string execPath string + volumeID string + mount.Interface } -func NewOssCmdMounter(execPath, volumeId string, inner mount.Interface) Mounter { +var _ Mounter = &OssCmdMounter{} + +func NewOssCmdMounter(execPath, volumeID string, inner mount.Interface) Mounter { return &OssCmdMounter{ execPath: execPath, - volumeId: volumeId, + volumeID: volumeID, Interface: inner, } } -func (m *OssCmdMounter) ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error { +func (m *OssCmdMounter) ExtendedMount(_ context.Context, op *MountOperation) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(timeout)) defer cancel() - // Parameters in ExtendedMountParams are optional - if params == nil { - params = &ExtendedMountParams{} - } - passwd, err := utils.SaveOssSecretsToFile(params.Secrets, fstype) - if err != nil { - return err - } - - args := getArgs(source, target, fstype, passwd, options) - cmd := exec.CommandContext(ctx, m.execPath, args...) + cmd := exec.CommandContext(ctx, m.execPath, getArgs(op)...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -51,20 +42,21 @@ func (m *OssCmdMounter) ExtendedMount(source, target, fstype string, options []s return nil } -func getArgs(source, target, fstype, passwdFile string, options []string) []string { - if fstype == "ossfs" { - if passwdFile != "" { - options = append(options, "passwd_file="+passwdFile) - } - return mount.MakeMountArgs(source, target, "", options) +func getArgs(op *MountOperation) []string { + if op == nil { + return nil } - // ossfs2 - args := []string{"mount", target} - if passwdFile != "" { - args = append(args, []string{"-c", passwdFile}...) - } - for _, o := range options { - args = append(args, fmt.Sprintf("--%s", o)) + switch op.FsType { + case "ossfs": + return mount.MakeMountArgs(op.Source, op.Target, "", op.Options) + case "ossfs2": + args := []string{"mount", op.Target} + args = append(args, op.Args...) + for _, o := range op.Options { + args = append(args, fmt.Sprintf("--%s", o)) + } + return args + default: + return nil } - return args } diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go new file mode 100644 index 000000000..e3a0333e6 --- /dev/null +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -0,0 +1,56 @@ +package interceptors + +import ( + "context" + "fmt" + "os" + "path" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "k8s.io/klog/v2" +) + +var _ mounter.MountInterceptor = AlinasSecretInterceptor + +func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + if op == nil || op.Secrets == nil { + return handler(ctx, op) + } + + tmpCredFile, err := os.CreateTemp("", op.VolumeID+"-*.credentials") + if err != nil { + return err + } + defer func() { + if err = os.Remove(tmpCredFile.Name()); err != nil && !os.IsNotExist(err) { + klog.ErrorS(err, "Failed to remove temporary alinas credential file", "path", tmpCredFile.Name()) + } + }() + + credFileContent := makeCredFileContent(op.Secrets) + if _, err = tmpCredFile.Write(credFileContent); err != nil { + return err + } + if err = tmpCredFile.Close(); err != nil { + return err + } + + credFilePath := path.Join(os.TempDir(), op.VolumeID+".credentials") + if err = os.Rename(tmpCredFile.Name(), credFilePath); err != nil { + return err + } + + klog.V(4).InfoS("Created alinas credential file", "path", credFilePath) + op.Options = append(op.Options, "ram_config_file="+credFilePath) + + return handler(ctx, op) +} + +func makeCredFileContent(secrets map[string]string) []byte { + return fmt.Appendf( + nil, + "[NASCredentials]\naccessKeyID=%s\naccessKeySecret=%s", + secrets["akId"], + secrets["akSecret"], + ) +} diff --git a/pkg/mounter/interceptors/ossfs_monitor.go b/pkg/mounter/interceptors/ossfs_monitor.go new file mode 100644 index 000000000..50e2743ac --- /dev/null +++ b/pkg/mounter/interceptors/ossfs_monitor.go @@ -0,0 +1,65 @@ +package interceptors + +import ( + "context" + "errors" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" + "k8s.io/klog/v2" + "k8s.io/mount-utils" +) + +var _ mounter.MountInterceptor = OssfsMonitorInterceptor + +var ( + raw = mount.NewWithoutSystemd("") + monitorManager = server.NewMountMonitorManager() +) + +func OssfsMonitorInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + if op == nil || op.MetricsPath == "" { + return handler(ctx, op) + } + + // Get or create monitor for this target + monitor, found := monitorManager.GetMountMonitor(op.Target, op.MetricsPath, raw, true) + if monitor == nil { + klog.ErrorS(errors.New("failed to get mount monitor"), "stop monitoring mountpoint status", "mountpoint", op.Target) + return handler(ctx, op) + } + if found { + monitor.IncreaseMountRetryCount() + } + + err := handler(ctx, op) + + // Immediate process-exit handling during mount attempt + // Assume the process exits with no error upon receiving SIGTERM, + // and exits with an error in case of unexpected failures. + monitor.HandleMountFailureOrExit(err) + + if op.MountResult == nil { + return err + } + + res, ok := op.MountResult.(server.OssfsMountResult) + if !ok { + klog.ErrorS(errors.New("failed to assert ossfs mount result type"), "skipping monitoring of mountpoint", "mountpoint", op.Target) + return err + } + + go func() { + err := <-res.ExitChan + monitor.HandleMountFailureOrExit(err) + }() + + if err != nil { + return err + } + + monitor.HandleMountSuccess(res.PID) + // Start monitoring goroutine (ticker based only) + monitorManager.StartMonitoring(op.Target) + return nil +} diff --git a/pkg/mounter/interceptors/ossfs_secret.go b/pkg/mounter/interceptors/ossfs_secret.go new file mode 100644 index 000000000..c6299e5e2 --- /dev/null +++ b/pkg/mounter/interceptors/ossfs_secret.go @@ -0,0 +1,60 @@ +package interceptors + +import ( + "context" + "errors" + "os" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + "k8s.io/klog/v2" +) + +var _ mounter.MountInterceptor = OssfsSecretInterceptor + +func OssfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + return ossfsSecretInterceptor(ctx, op, handler, "ossfs") +} + +func Ossfs2SecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + return ossfsSecretInterceptor(ctx, op, handler, "ossfs2") +} + +func ossfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler, fuseType string) error { + passwdFile, err := utils.SaveOssSecretsToFile(op.Secrets, op.FsType) + if err != nil { + return err + } + if passwdFile != "" { + if fuseType == "ossfs" { + op.Args = append(op.Args, "passwd_file="+passwdFile) + } else { + op.Args = append(op.Args, []string{"-c", passwdFile}...) + } + } + + if err = handler(ctx, op); err != nil { + return err + } + + if passwdFile == "" || op.MountResult == nil { + return nil + } + result, ok := op.MountResult.(*server.OssfsMountResult) + if !ok { + klog.ErrorS( + errors.New("failed to assert ossfs mount result"), + "skipping cleanup of passwd file", "mountpoint", op.Target, "path", passwdFile, + ) + return nil + } + + go func() { + <-result.ExitChan + if err := os.Remove(passwdFile); err != nil { + klog.ErrorS(err, "failed to cleanup ossfs passwd file", "mountpoint", op.Target, "path", passwdFile) + } + }() + return nil +} diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 3596abf8f..fe56cc99a 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -1,15 +1,69 @@ package mounter import ( + "context" + mountutils "k8s.io/mount-utils" ) -type ExtendedMountParams struct { +type Mounter interface { + mountutils.Interface + ExtendedMount(ctx context.Context, op *MountOperation) error +} + +type MountOperation struct { + Source string + Target string + FsType string + Options []string + Args []string Secrets map[string]string MetricsPath string + VolumeID string + + MountResult any } -type Mounter interface { - mountutils.Interface - ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error +type MountHandler func(ctx context.Context, op *MountOperation) error + +type MountInterceptor func(ctx context.Context, op *MountOperation, handler MountHandler) error + +type MountWorkflow struct { + Mounter + chainedHandler MountHandler +} + +var _ Mounter = &MountWorkflow{} + +func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) error { + return w.chainedHandler(ctx, op) +} + +// chainInterceptors creates a chain of interceptors similar to gRPC +func chainInterceptors(interceptors []MountInterceptor, finalHandler MountHandler) MountHandler { + if len(interceptors) == 0 { + return finalHandler + } + + return func(ctx context.Context, op *MountOperation) error { + return interceptors[0](ctx, op, getChainHandler(interceptors, 0, finalHandler)) + } +} + +// getChainHandler creates a handler that chains interceptors recursively +func getChainHandler(interceptors []MountInterceptor, curr int, finalHandler MountHandler) MountHandler { + if curr == len(interceptors)-1 { + return finalHandler + } + + return func(ctx context.Context, op *MountOperation) error { + return interceptors[curr+1](ctx, op, getChainHandler(interceptors, curr+1, finalHandler)) + } +} + +func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter { + return &MountWorkflow{ + Mounter: m, + chainedHandler: chainInterceptors(interceptors, m.ExtendedMount), + } } diff --git a/pkg/mounter/proxy/protocol.go b/pkg/mounter/proxy/protocol.go index 5d94a4495..9585334f8 100644 --- a/pkg/mounter/proxy/protocol.go +++ b/pkg/mounter/proxy/protocol.go @@ -43,4 +43,5 @@ type MountRequest struct { MountFlags []string `json:"mountFlags,omitempty"` Secrets map[string]string `json:"secrets,omitempty"` MetricsPath string `json:"metricsPath,omitempty"` + VolumeID string `json:"volumeID,omitempty"` } diff --git a/pkg/mounter/proxy/server/alinas/driver.go b/pkg/mounter/proxy/server/alinas/driver.go index 255b43e2a..b14a3bee1 100644 --- a/pkg/mounter/proxy/server/alinas/driver.go +++ b/pkg/mounter/proxy/server/alinas/driver.go @@ -11,9 +11,11 @@ import ( "syscall" "time" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - mounter "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -25,11 +27,16 @@ const ( ) func init() { - server.RegisterDriver(&Driver{mounter: mount.New("")}) + server.RegisterDriver(&Driver{ + Mounter: mounter.NewForMounter( + &extendedMounter{Interface: mount.New("")}, + interceptors.AlinasSecretInterceptor, + ), + }) } type Driver struct { - mounter mount.Interface + mounter.Mounter } func (h *Driver) Name() string { @@ -41,15 +48,15 @@ func (h *Driver) Fstypes() []string { } func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - klog.InfoS("Mounting", "fstype", req.Fstype, "source", req.Source, "target", req.Target, "options", req.Options) - options := append(req.Options, "no_start_watchdog") - if req.Fstype == fstypeAlinas { - // options = append(options, "no_atomic_move", "auto_fallback_nfs") - options = append(options, "no_atomic_move") - options = addAutoFallbackNFSMountOptions(options) - } - - return h.mounter.Mount(req.Source, req.Target, req.Fstype, options) + return h.ExtendedMount(ctx, &mounter.MountOperation{ + Source: req.Source, + Target: req.Target, + FsType: req.Fstype, + Options: req.Options, + Secrets: req.Secrets, + MetricsPath: req.MetricsPath, + VolumeID: req.VolumeID, + }) } func (h *Driver) Init() { @@ -79,7 +86,7 @@ func addAutoFallbackNFSMountOptions(mountOptions []string) []string { isEFC := false isVSC := false for _, options := range mountOptions { - for _, option := range mounter.SplitMountOptions(options) { + for _, option := range mounterutils.SplitMountOptions(options) { if option == "" { continue } @@ -158,3 +165,20 @@ func copyFile(src, dst string) error { return dstFile.Sync() } + +type extendedMounter struct { + mount.Interface +} + +var _ mounter.Mounter = &extendedMounter{} + +func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) error { + klog.InfoS("Mounting", "fstype", op.FsType, "source", op.Source, "target", op.Target, "options", op.Options) + op.Options = append(op.Options, "no_start_watchdog") + if op.FsType == fstypeAlinas { + // options = append(options, "no_atomic_move", "auto_fallback_nfs") + op.Options = append(op.Options, "no_atomic_move") + op.Options = addAutoFallbackNFSMountOptions(op.Options) + } + return m.Mount(op.Source, op.Target, op.FsType, op.Options) +} diff --git a/pkg/mounter/proxy/server/metrics.go b/pkg/mounter/proxy/server/metrics.go index 0a9523f3f..725c13ff5 100644 --- a/pkg/mounter/proxy/server/metrics.go +++ b/pkg/mounter/proxy/server/metrics.go @@ -3,7 +3,6 @@ package server import ( "fmt" "os" - "os/exec" "path/filepath" "strconv" "sync" @@ -125,15 +124,15 @@ func (m *MountMonitor) HandleMountFailureOrExit(err error) { } // HandleMountSuccess handles the case when mount operation succeeds -func (m *MountMonitor) HandleMountSuccess(process *exec.Cmd) { +func (m *MountMonitor) HandleMountSuccess(pid int) { m.mu.Lock() defer m.mu.Unlock() // Update metrics for mount success m.updateMountPointMetrics(&m.retryCount, nil, nil) + m.Pid = pid m.State = MonitorStateMonitoring - m.Pid = process.Process.Pid klog.InfoS("Mount succeeded", "target", m.Target, "pid", m.Pid) } diff --git a/pkg/mounter/proxy/server/mount_result.go b/pkg/mounter/proxy/server/mount_result.go new file mode 100644 index 000000000..538722182 --- /dev/null +++ b/pkg/mounter/proxy/server/mount_result.go @@ -0,0 +1,6 @@ +package server + +type OssfsMountResult struct { + PID int + ExitChan chan error +} diff --git a/pkg/mounter/proxy/server/ossfs/driver.go b/pkg/mounter/proxy/server/ossfs/driver.go index 31812a552..64724bafb 100644 --- a/pkg/mounter/proxy/server/ossfs/driver.go +++ b/pkg/mounter/proxy/server/ossfs/driver.go @@ -14,10 +14,10 @@ import ( "golang.org/x/sys/unix" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - serverutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -29,18 +29,27 @@ func init() { // Driver manages ossfs mounts and their monitoring type Driver struct { + mounter.Mounter pids *sync.Map monitorManager *server.MountMonitorManager wg sync.WaitGroup - raw mount.Interface } func NewDriver() *Driver { - return &Driver{ + driver := &Driver{ pids: new(sync.Map), monitorManager: server.NewMountMonitorManager(), - raw: mount.NewWithoutSystemd(""), } + m := &extendedMounter{ + driver: driver, + Interface: mount.NewWithoutSystemd(""), + } + driver.Mounter = mounter.NewForMounter( + m, + interceptors.OssfsSecretInterceptor, + interceptors.OssfsMonitorInterceptor, + ) + return driver } func (h *Driver) Name() string { @@ -52,38 +61,57 @@ func (h *Driver) Fstypes() []string { } func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - options := req.Options - target := req.Target - - // Get or create monitor for this target - var monitor *server.MountMonitor - if req.MetricsPath != "" { - var found bool - monitor, found = h.monitorManager.GetMountMonitor(target, req.MetricsPath, h.raw, true) - if monitor == nil { - klog.Errorf("Failed to get mount monitor for %s, stop monitoring mountpoint status", target) - } else if found { - monitor.IncreaseMountRetryCount() - } - } + return h.ExtendedMount(ctx, &mounter.MountOperation{ + Source: req.Source, + Target: req.Target, + FsType: req.Fstype, + Options: req.Options, + Secrets: req.Secrets, + MetricsPath: req.MetricsPath, + VolumeID: req.VolumeID, + }) +} - // prepare passwd file - passwdFile, err := utils.SaveOssSecretsToFile(req.Secrets, req.Fstype) - if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(err) +func (h *Driver) Init() {} + +func (h *Driver) Terminate() { + // Stop all mount monitoring + h.monitorManager.StopAllMonitoring() + + // terminate all running ossfs + h.pids.Range(func(key, value any) bool { + err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) + if err != nil { + klog.ErrorS(err, "Failed to terminate ossfs", "pid", key) } - return err - } - options = append(options, "passwd_file="+passwdFile) + klog.V(4).InfoS("Sended sigterm", "pid", key) + return true + }) + + // wait all ossfs processes and monitoring goroutines to exit + h.monitorManager.WaitForAllMonitoring() + h.wg.Wait() + klog.InfoS("All ossfs processes and monitoring goroutines exited") +} + +type extendedMounter struct { + driver *Driver + mount.Interface +} + +var _ mounter.Mounter = &extendedMounter{} - args := mount.MakeMountArgs(req.Source, req.Target, "", options) - args = append(args, req.MountFlags...) +func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) error { + options := op.Options + target := op.Target + + args := mount.MakeMountArgs(op.Source, op.Target, "", options) + args = append(args, op.Args...) args = append(args, "-f") var stderrBuf bytes.Buffer multiWriter := io.MultiWriter(os.Stderr, &stderrBuf) - sw := serverutils.NewSwitchableWriter(multiWriter) + sw := server.NewSwitchableWriter(multiWriter) cmd := exec.Command("ossfs", args...) cmd.Stdout = os.Stdout cmd.Stderr = sw @@ -91,12 +119,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { sw.SwitchTarget(os.Stderr) }() - err = cmd.Start() + err := cmd.Start() if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(fmt.Errorf("start ossfs failed: %w", err)) - } - return err + return fmt.Errorf("start ossfs failed: %w", err) } pid := cmd.Process.Pid @@ -111,11 +136,11 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Wait for mount to complete ossfsExited := make(chan error, 1) - h.wg.Add(1) - h.pids.Store(pid, cmd) + m.driver.wg.Add(1) + m.driver.pids.Store(pid, cmd) go func() { - defer h.wg.Done() - defer h.pids.Delete(pid) + defer m.driver.wg.Done() + defer m.driver.pids.Delete(pid) err := cmd.Wait() if err != nil { @@ -129,17 +154,8 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } else { klog.InfoS("ossfs exited", "mountpoint", target, "pid", pid) } - // Immediate process-exit handling during mount attempt - // Assume the process exits with no error upon receiving SIGTERM, - // and exits with an error in case of unexpected failures. - if monitor != nil { - monitor.HandleMountFailureOrExit(err) - } // Notify poll loop after metrics are updated ossfsExited <- err - if err := os.Remove(passwdFile); err != nil { - klog.ErrorS(err, "Remove passwd file", "mountpoint", target, "path", passwdFile) - } close(ossfsExited) }() @@ -151,7 +167,7 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } return false, fmt.Errorf("ossfs exited") default: - notMnt, err := h.raw.IsLikelyNotMountPoint(target) + notMnt, err := m.IsLikelyNotMountPoint(target) if err != nil { klog.ErrorS(err, "check mountpoint", "mountpoint", target) return false, nil @@ -165,10 +181,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { }) if err == nil { - if monitor != nil { - monitor.HandleMountSuccess(cmd) - // Start monitoring goroutine (ticker based only) - h.monitorManager.StartMonitoring(target) + op.MountResult = server.OssfsMountResult{ + PID: pid, + ExitChan: ossfsExited, } return nil } @@ -192,25 +207,3 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Just return the error to caller to avoid double counting. return err } - -func (h *Driver) Init() {} - -func (h *Driver) Terminate() { - // Stop all mount monitoring - h.monitorManager.StopAllMonitoring() - - // terminate all running ossfs - h.pids.Range(func(key, value any) bool { - err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) - if err != nil { - klog.ErrorS(err, "Failed to terminate ossfs", "pid", key) - } - klog.V(4).InfoS("Sended sigterm", "pid", key) - return true - }) - - // wait all ossfs processes and monitoring goroutines to exit - h.monitorManager.WaitForAllMonitoring() - h.wg.Wait() - klog.InfoS("All ossfs processes and monitoring goroutines exited") -} diff --git a/pkg/mounter/proxy/server/ossfs2/driver.go b/pkg/mounter/proxy/server/ossfs2/driver.go index a879bb90f..c76bac5c6 100644 --- a/pkg/mounter/proxy/server/ossfs2/driver.go +++ b/pkg/mounter/proxy/server/ossfs2/driver.go @@ -14,10 +14,10 @@ import ( "golang.org/x/sys/unix" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - serverutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -28,18 +28,27 @@ func init() { } type Driver struct { + mounter.Mounter pids *sync.Map monitorManager *server.MountMonitorManager wg sync.WaitGroup - raw mount.Interface } func NewDriver() *Driver { - return &Driver{ + driver := &Driver{ pids: new(sync.Map), monitorManager: server.NewMountMonitorManager(), - raw: mount.NewWithoutSystemd(""), } + m := &extendedMounter{ + driver: driver, + Interface: mount.NewWithoutSystemd(""), + } + driver.Mounter = mounter.NewForMounter( + m, + interceptors.Ossfs2SecretInterceptor, + interceptors.OssfsMonitorInterceptor, + ) + return driver } func (h *Driver) Name() string { @@ -51,36 +60,54 @@ func (h *Driver) Fstypes() []string { } func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - options := req.Options - target := req.Target - - // Get or create monitor for this target - var monitor *server.MountMonitor - if req.MetricsPath != "" { - var found bool - monitor, found = h.monitorManager.GetMountMonitor(target, req.MetricsPath, h.raw, true) - if monitor == nil { - klog.Errorf("Failed to get mount monitor for %s, stop monitoring mountpoint status", target) - } else if found { - monitor.IncreaseMountRetryCount() - } - } + return h.ExtendedMount(ctx, &mounter.MountOperation{ + Source: req.Source, + Target: req.Target, + FsType: req.Fstype, + Options: req.Options, + Secrets: req.Secrets, + MetricsPath: req.MetricsPath, + VolumeID: req.VolumeID, + }) +} - // prepare passwd file - passwdFile, err := utils.SaveOssSecretsToFile(req.Secrets, req.Fstype) - if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(err) +func (h *Driver) Init() {} + +func (h *Driver) Terminate() { + // Stop all mount monitoring + h.monitorManager.StopAllMonitoring() + + // terminate all running ossfs2 + h.pids.Range(func(key, value any) bool { + err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) + if err != nil { + klog.ErrorS(err, "Failed to terminate ossfs2", "pid", key) } - return err - } + klog.V(4).InfoS("Sended sigterm", "pid", key) + return true + }) + + // wait all ossfs2 processes and monitoring goroutines to exit + h.monitorManager.WaitForAllMonitoring() + h.wg.Wait() + klog.InfoS("All ossfs2 processes and monitoring goroutines exited") +} + +type extendedMounter struct { + driver *Driver + mount.Interface +} - args := []string{"mount", req.Target} +var _ mounter.Mounter = &extendedMounter{} + +func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) error { + options := op.Options + target := op.Target + + args := []string{"mount", op.Target} // ossfs2.0 forbid to use FUSE args // args = append(args, req.MountFlags...) - if passwdFile != "" { - args = append(args, []string{"-c", passwdFile}...) - } + args = append(args, op.Args...) for _, o := range options { args = append(args, fmt.Sprintf("--%s", o)) } @@ -88,7 +115,7 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { var stderrBuf bytes.Buffer multiWriter := io.MultiWriter(os.Stderr, &stderrBuf) - sw := serverutils.NewSwitchableWriter(multiWriter) + sw := server.NewSwitchableWriter(multiWriter) cmd := exec.Command("ossfs2", args...) cmd.Stdout = os.Stdout cmd.Stderr = sw @@ -96,12 +123,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { sw.SwitchTarget(os.Stderr) }() - err = cmd.Start() + err := cmd.Start() if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(fmt.Errorf("start ossfs2 failed: %w", err)) - } - return err + return fmt.Errorf("start ossfs2 failed: %w", err) } pid := cmd.Process.Pid @@ -116,11 +140,11 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Wait for mount to complete ossfsExited := make(chan error, 1) - h.wg.Add(1) - h.pids.Store(pid, cmd) + m.driver.wg.Add(1) + m.driver.pids.Store(pid, cmd) go func() { - defer h.wg.Done() - defer h.pids.Delete(pid) + defer m.driver.wg.Done() + defer m.driver.pids.Delete(pid) err := cmd.Wait() if err != nil { @@ -134,17 +158,8 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } else { klog.InfoS("ossfs2 exited", "mountpoint", target, "pid", pid) } - // Immediate process-exit handling during mount attempt - // Assume the process exits with no error upon receiving SIGTERM, - // and exits with an error in case of unexpected failures. - if monitor != nil { - monitor.HandleMountFailureOrExit(err) - } // Notify poll loop after metrics are updated ossfsExited <- err - if err := os.Remove(passwdFile); err != nil { - klog.ErrorS(err, "Remove configuration file", "mountpoint", target, "path", passwdFile) - } close(ossfsExited) }() @@ -156,7 +171,7 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } return false, fmt.Errorf("ossfs2 exited") default: - notMnt, err := h.raw.IsLikelyNotMountPoint(target) + notMnt, err := m.IsLikelyNotMountPoint(target) if err != nil { klog.ErrorS(err, "check mountpoint", "mountpoint", target) return false, nil @@ -170,10 +185,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { }) if err == nil { - if monitor != nil { - monitor.HandleMountSuccess(cmd) - // Start monitoring goroutine (ticker based only) - h.monitorManager.StartMonitoring(target) + op.MountResult = server.OssfsMountResult{ + PID: pid, + ExitChan: ossfsExited, } return nil } @@ -197,25 +211,3 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Just return the error to caller to avoid double counting. return err } - -func (h *Driver) Init() {} - -func (h *Driver) Terminate() { - // Stop all mount monitoring - h.monitorManager.StopAllMonitoring() - - // terminate all running ossfs2 - h.pids.Range(func(key, value any) bool { - err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) - if err != nil { - klog.ErrorS(err, "Failed to terminate ossfs2", "pid", key) - } - klog.V(4).InfoS("Sended sigterm", "pid", key) - return true - }) - - // wait all ossfs2 processes and monitoring goroutines to exit - h.monitorManager.WaitForAllMonitoring() - h.wg.Wait() - klog.InfoS("All ossfs2 processes and monitoring goroutines exited") -} diff --git a/pkg/mounter/proxy_mounter.go b/pkg/mounter/proxy_mounter.go index 49a558da0..356c97545 100644 --- a/pkg/mounter/proxy_mounter.go +++ b/pkg/mounter/proxy_mounter.go @@ -1,6 +1,7 @@ package mounter import ( + "context" "errors" "fmt" @@ -14,6 +15,8 @@ type ProxyMounter struct { mountutils.Interface } +var _ Mounter = &ProxyMounter{} + func NewProxyMounter(socketPath string, inner mountutils.Interface) Mounter { return &ProxyMounter{ socketPath: socketPath, @@ -21,20 +24,16 @@ func NewProxyMounter(socketPath string, inner mountutils.Interface) Mounter { } } -func (m *ProxyMounter) ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error { - // Parameters in ExtendedMountParams are optional - if params == nil { - params = &ExtendedMountParams{} - } - +func (m *ProxyMounter) ExtendedMount(ctx context.Context, op *MountOperation) error { dclient := client.NewClient(m.socketPath) resp, err := dclient.Mount(&proxy.MountRequest{ - Source: source, - Target: target, - Fstype: fstype, - Options: options, - Secrets: params.Secrets, - MetricsPath: params.MetricsPath, + Source: op.Source, + Target: op.Target, + Fstype: op.FsType, + Options: op.Options, + Secrets: op.Secrets, + MetricsPath: op.MetricsPath, + VolumeID: op.VolumeID, }) if err != nil { return fmt.Errorf("call mounter daemon: %w", err) @@ -43,7 +42,7 @@ func (m *ProxyMounter) ExtendedMount(source, target, fstype string, options []st if err != nil { return fmt.Errorf("failed to mount: %w", err) } - notMnt, err := m.IsLikelyNotMountPoint(target) + notMnt, err := m.IsLikelyNotMountPoint(op.Target) if err != nil { return err } @@ -54,5 +53,10 @@ func (m *ProxyMounter) ExtendedMount(source, target, fstype string, options []st } func (m *ProxyMounter) Mount(source string, target string, fstype string, options []string) error { - return m.ExtendedMount(source, target, fstype, options, nil) + return m.ExtendedMount(context.Background(), &MountOperation{ + Source: source, + Target: target, + FsType: fstype, + Options: options, + }) } diff --git a/pkg/nas/mounter.go b/pkg/nas/mounter.go index 203be51e7..1c5d46ce2 100644 --- a/pkg/nas/mounter.go +++ b/pkg/nas/mounter.go @@ -1,6 +1,8 @@ package nas import ( + "context" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "k8s.io/klog/v2" mountutils "k8s.io/mount-utils" @@ -11,18 +13,20 @@ type NasMounter struct { alinasMounter mountutils.Interface } -func (m *NasMounter) Mount(source string, target string, fstype string, options []string) (err error) { +var _ mounter.Mounter = &NasMounter{} + +func (m *NasMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) (err error) { logger := klog.Background().WithValues( - "source", source, - "target", target, - "options", options, - "fstype", fstype, + "source", op.Source, + "target", op.Target, + "options", op.Options, + "fstype", op.FsType, ) - switch fstype { + switch op.FsType { case "alinas", "cpfs", "cpfs-nfs": - err = m.alinasMounter.Mount(source, target, fstype, options) + err = m.alinasMounter.Mount(op.Source, op.Target, op.FsType, op.Options) default: - err = m.Interface.Mount(source, target, fstype, options) + err = m.Mount(op.Source, op.Target, op.FsType, op.Options) } if err != nil { logger.Error(err, "failed to mount") @@ -32,22 +36,17 @@ func (m *NasMounter) Mount(source string, target string, fstype string, options return err } -func newNasMounter(agentMode bool) mountutils.Interface { +func newNasMounter(agentMode bool, socketPath string) mounter.Mounter { inner := mountutils.NewWithoutSystemd("") m := &NasMounter{ Interface: inner, alinasMounter: inner, } - if !agentMode { + switch { + case socketPath != "": + m.alinasMounter = mounter.NewProxyMounter(socketPath, inner) + case !agentMode: // normal case, use connector mounter to ensure backward compatibility m.alinasMounter = mounter.NewConnectorMounter(inner, "") } return m } - -func newNasMounterWithProxy(socketPath string) mountutils.Interface { - inner := mountutils.NewWithoutSystemd("") - return &NasMounter{ - Interface: inner, - alinasMounter: mounter.NewProxyMounter(socketPath, inner), - } -} diff --git a/pkg/nas/mounter_test.go b/pkg/nas/mounter_test.go index b299085b3..e98baf5bf 100644 --- a/pkg/nas/mounter_test.go +++ b/pkg/nas/mounter_test.go @@ -1,9 +1,11 @@ package nas import ( + "context" "errors" "testing" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "github.com/stretchr/testify/assert" mountutils "k8s.io/mount-utils" ) @@ -25,7 +27,7 @@ func (m *errorMockMounter) Mount(source string, target string, fstype string, op } func TestNewNasMounter(t *testing.T) { - actual := newNasMounter(true) + actual := newNasMounter(true, "") assert.NotNil(t, actual) } @@ -34,7 +36,7 @@ func TestNasMounter_MountSuccess(t *testing.T) { Interface: &successMockMounter{}, alinasMounter: &successMockMounter{}, } - err := nasMounter.Mount("", "", "nas", []string{}) + err := nasMounter.ExtendedMount(context.Background(), &mounter.MountOperation{}) assert.NoError(t, err) } @@ -43,6 +45,8 @@ func TestNasMounter_FuseMountError(t *testing.T) { Interface: &errorMockMounter{}, alinasMounter: &errorMockMounter{}, } - err := nasMounter.Mount("", "", "cpfs", []string{}) + err := nasMounter.ExtendedMount(context.Background(), &mounter.MountOperation{ + FsType: "cpfs", + }) assert.Error(t, err) } diff --git a/pkg/nas/nodeserver.go b/pkg/nas/nodeserver.go index 365e07c94..b09f76c14 100644 --- a/pkg/nas/nodeserver.go +++ b/pkg/nas/nodeserver.go @@ -34,6 +34,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/dadi" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/losetup" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/internal" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/rund/directvolume" @@ -42,12 +43,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - mountutils "k8s.io/mount-utils" ) type nodeServer struct { config *internal.NodeConfig - mounter mountutils.Interface + mounter mounter.Mounter locks *utils.VolumeLocks recorder record.EventRecorder common.GenericNodeServer @@ -68,15 +68,10 @@ func newNodeServer(config *internal.NodeConfig) *nodeServer { GenericNodeServer: common.GenericNodeServer{ NodeID: config.NodeName, }, + mounter: newNasMounter(config.AgentMode, config.MountProxySocket), } - if config.MountProxySocket == "" { - if !ns.config.AgentMode { - ns.recorder = utils.NewEventRecorder() - } - ns.mounter = newNasMounter(ns.config.AgentMode) - } else { - ns.recorder = utils.NewEventRecorder() - ns.mounter = newNasMounterWithProxy(config.MountProxySocket) + if !ns.config.AgentMode { + ns.recorder = utils.NewEventRecorder() // There is no kubeconfig under agent mode } return ns } @@ -96,6 +91,8 @@ type Options struct { MountProtocol string `json:"mountProtocol"` ClientType string `json:"clientType"` FSType string `json:"fsType"` + AkID string + AkSecret string } // RunvNasOptions struct definition @@ -251,6 +248,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis opt.MountProtocol = strings.TrimSpace(value) } } + opt.AkID = req.Secrets[akIDKey] + opt.AkSecret = req.Secrets[akSecretKey] if cnfsName != "" { cnfs, err := ns.getCNFS(ctx, req, cnfsName) diff --git a/pkg/nas/utils.go b/pkg/nas/utils.go index b0dd92d61..103b3fd39 100644 --- a/pkg/nas/utils.go +++ b/pkg/nas/utils.go @@ -17,6 +17,7 @@ limitations under the License. package nas import ( + "context" "errors" "fmt" "os" @@ -27,7 +28,8 @@ import ( "github.com/alibabacloud-go/tea/tea" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/losetup" - mounter "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/interfaces" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" @@ -52,6 +54,8 @@ const ( TcpSlotTableEntries = "/proc/sys/sunrpc/tcp_slot_table_entries" TcpSlotTableEntriesValue = "128\n" + akIDKey = "akId" + akSecretKey = "akSecret" filesystemIDKey = "fileSystemId" filesystemTypeKey = "fileSystemType" ) @@ -66,11 +70,12 @@ type RoleAuth struct { Code string } -func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, podUid string) error { +func doMount(m mounter.Mounter, opt *Options, targetPath, volumeId, podUid string) error { var ( mountFstype string source string combinedOptions []string + secrets map[string]string isPathNotFound func(error) bool ) if opt.Accesspoint != "" { @@ -81,6 +86,12 @@ func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, p if opt.Options != "" { combinedOptions = append(combinedOptions, opt.Options) } + if opt.AkID != "" && opt.AkSecret != "" { + secrets = map[string]string{ + akIDKey: opt.AkID, + akSecretKey: opt.AkSecret, + } + } switch opt.ClientType { case EFCClient: @@ -127,7 +138,15 @@ func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, p return strings.Contains(err.Error(), "reason given by server: No such file or directory") || strings.Contains(err.Error(), "access denied by server while mounting") } } - err := mounter.Mount(source, targetPath, mountFstype, combinedOptions) + + err := m.ExtendedMount(context.Background(), &mounter.MountOperation{ + Source: source, + Target: targetPath, + FsType: mountFstype, + Options: combinedOptions, + Secrets: secrets, + VolumeID: volumeId, + }) if err == nil { return nil } @@ -154,16 +173,30 @@ func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, p return err } defer os.Remove(tmpPath) - if err := mounter.Mount(rootSource, tmpPath, mountFstype, combinedOptions); err != nil { + if err := m.ExtendedMount(context.Background(), &mounter.MountOperation{ + Source: rootSource, + Target: tmpPath, + FsType: mountFstype, + Options: combinedOptions, + Secrets: secrets, + VolumeID: volumeId, + }); err != nil { return err } if err := os.MkdirAll(filepath.Join(tmpPath, relPath), os.ModePerm); err != nil { return err } - if err := cleanupMountpoint(mounter, tmpPath); err != nil { + if err := cleanupMountpoint(m, tmpPath); err != nil { klog.Errorf("failed to cleanup tmp mountpoint %s: %v", tmpPath, err) } - return mounter.Mount(source, targetPath, mountFstype, combinedOptions) + return m.ExtendedMount(context.Background(), &mounter.MountOperation{ + Source: source, + Target: targetPath, + FsType: mountFstype, + Options: combinedOptions, + Secrets: secrets, + VolumeID: volumeId, + }) } // check system config, @@ -181,7 +214,7 @@ func ParseMountFlags(mntOptions []string) (string, string) { var vers string var otherOptions []string for _, options := range mntOptions { - for _, option := range mounter.SplitMountOptions(options) { + for _, option := range mounterutils.SplitMountOptions(options) { if option == "" { continue } @@ -201,7 +234,7 @@ func ParseMountFlags(mntOptions []string) (string, string) { func addTLSMountOptions(baseOptions []string) []string { for _, options := range baseOptions { - for _, option := range mounter.SplitMountOptions(options) { + for _, option := range mounterutils.SplitMountOptions(options) { if option == "" { continue } diff --git a/pkg/oss/nodeserver.go b/pkg/oss/nodeserver.go index 8a4029c41..327e3133a 100644 --- a/pkg/oss/nodeserver.go +++ b/pkg/oss/nodeserver.go @@ -26,6 +26,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/oss" mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" @@ -66,6 +67,13 @@ const ( // use unifiedFsType instead var unifiedFsType = OssFsType +// Used by OssCmdMounter only, since ProxyMounter is only the client side, +// the interceptors are applied at the server side. +var ossInterceptors = map[string][]mounter.MountInterceptor{ + OssFsType: {interceptors.OssfsSecretInterceptor}, + OssFs2Type: {interceptors.Ossfs2SecretInterceptor}, +} + func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { return &csi.NodeGetCapabilitiesResponse{Capabilities: []*csi.NodeServiceCapability{ { @@ -161,7 +169,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - ossfsMounter = mounter.NewOssCmdMounter(ns.ossfsPaths[opts.FuseType], req.VolumeId, ns.rawMounter) + ossfsMounter = mounter.NewForMounter( + mounter.NewOssCmdMounter(ns.ossfsPaths[opts.FuseType], req.VolumeId, ns.rawMounter), + ossInterceptors[opts.FuseType]..., + ) } else { ossfsMounter = mounter.NewProxyMounter(socketPath, ns.rawMounter) } @@ -169,12 +180,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // When work as csi-agent, directly mount on the target path. if ns.skipAttach { metricsPath := utils.WriteMetricsInfo(metricsPathPrefix, req, opts.MetricsTop, opts.FuseType, "oss", opts.Bucket) - err := ossfsMounter.ExtendedMount( - mountSource, targetPath, opts.FuseType, - mountOptions, &mounter.ExtendedMountParams{ - Secrets: authCfg.Secrets, - MetricsPath: metricsPath, - }) + err := ossfsMounter.ExtendedMount(ctx, &mounter.MountOperation{ + Source: mountSource, + Target: targetPath, + FsType: opts.FuseType, + Options: mountOptions, + Secrets: authCfg.Secrets, + MetricsPath: metricsPath, + }) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -191,12 +204,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if notMnt { metricsPath := utils.WriteSharedMetricsInfo(metricsPathPrefix, req, opts.FuseType, "oss", opts.Bucket, attachPath) - err := ossfsMounter.ExtendedMount( - mountSource, attachPath, opts.FuseType, - mountOptions, &mounter.ExtendedMountParams{ - Secrets: authCfg.Secrets, - MetricsPath: metricsPath, - }) + err := ossfsMounter.ExtendedMount(ctx, &mounter.MountOperation{ + Source: mountSource, + Target: attachPath, + FsType: opts.FuseType, + Options: mountOptions, + Secrets: authCfg.Secrets, + MetricsPath: metricsPath, + }) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }