From 867b93f9bf1bb21dbe1f082106115e559583e469 Mon Sep 17 00:00:00 2001 From: iltyty Date: Mon, 17 Nov 2025 18:03:47 +0800 Subject: [PATCH 1/6] refactor: split non-mounting actions into interceptors --- pkg/mounter/cmd_mounter.go | 53 ++++++++++------------- pkg/mounter/interceptors/alinas_secret.go | 17 ++++++++ pkg/mounter/interceptors/ossfs_secret.go | 35 +++++++++++++++ pkg/mounter/mounter.go | 42 ++++++++++++++++-- pkg/mounter/proxy/protocol.go | 1 + pkg/mounter/proxy/server/alinas/driver.go | 50 +++++++++++++++------ pkg/mounter/proxy_mounter.go | 32 ++++++++------ pkg/nas/mounter.go | 15 +++---- pkg/nas/mounter_test.go | 2 +- pkg/nas/nodeserver.go | 11 ++--- pkg/oss/nodeserver.go | 39 +++++++++++------ 11 files changed, 203 insertions(+), 94 deletions(-) create mode 100644 pkg/mounter/interceptors/alinas_secret.go create mode 100644 pkg/mounter/interceptors/ossfs_secret.go diff --git a/pkg/mounter/cmd_mounter.go b/pkg/mounter/cmd_mounter.go index b603b651e..da3dc6731 100644 --- a/pkg/mounter/cmd_mounter.go +++ b/pkg/mounter/cmd_mounter.go @@ -7,41 +7,32 @@ import ( "os/exec" "time" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/mount-utils" ) const timeout = time.Second * 10 type OssCmdMounter struct { - mount.Interface - volumeId string execPath string + volumeID string + mount.Interface } -func NewOssCmdMounter(execPath, volumeId string, inner mount.Interface) Mounter { +var _ Mounter = &OssCmdMounter{} + +func NewOssCmdMounter(execPath, volumeID string, inner mount.Interface) Mounter { return &OssCmdMounter{ execPath: execPath, - volumeId: volumeId, + volumeID: volumeID, Interface: inner, } } -func (m *OssCmdMounter) ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error { +func (m *OssCmdMounter) ExtendedMount(_ context.Context, op *MountOperation) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(timeout)) defer cancel() - // Parameters in ExtendedMountParams are optional - if params == nil { - params = &ExtendedMountParams{} - } - passwd, err := utils.SaveOssSecretsToFile(params.Secrets, fstype) - if err != nil { - return err - } - - args := getArgs(source, target, fstype, passwd, options) - cmd := exec.CommandContext(ctx, m.execPath, args...) + cmd := exec.CommandContext(ctx, m.execPath, getArgs(op)...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -51,20 +42,20 @@ func (m *OssCmdMounter) ExtendedMount(source, target, fstype string, options []s return nil } -func getArgs(source, target, fstype, passwdFile string, options []string) []string { - if fstype == "ossfs" { - if passwdFile != "" { - options = append(options, "passwd_file="+passwdFile) - } - return mount.MakeMountArgs(source, target, "", options) +func getArgs(op *MountOperation) []string { + if op == nil { + return nil } - // ossfs2 - args := []string{"mount", target} - if passwdFile != "" { - args = append(args, []string{"-c", passwdFile}...) - } - for _, o := range options { - args = append(args, fmt.Sprintf("--%s", o)) + switch op.FsType { + case "ossfs": + return mount.MakeMountArgs(op.Source, op.Target, "", op.Options) + case "ossfs2": + args := []string{"mount", op.Target} + for _, o := range op.Options { + args = append(args, fmt.Sprintf("--%s", o)) + } + return args + default: + return nil } - return args } diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go new file mode 100644 index 000000000..75879ae0d --- /dev/null +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -0,0 +1,17 @@ +package interceptors + +import ( + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" +) + +type AlinasSecretInterceptor struct{} + +var _ mounter.MountInterceptor = AlinasSecretInterceptor{} + +func NewAlinasSecretInterceptor() mounter.MountInterceptor { + return AlinasSecretInterceptor{} +} + +func (AlinasSecretInterceptor) BeforeMount(req *mounter.MountOperation) (*mounter.MountOperation, error) { + return req, nil +} diff --git a/pkg/mounter/interceptors/ossfs_secret.go b/pkg/mounter/interceptors/ossfs_secret.go new file mode 100644 index 000000000..8c7906d5f --- /dev/null +++ b/pkg/mounter/interceptors/ossfs_secret.go @@ -0,0 +1,35 @@ +package interceptors + +import ( + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" +) + +type OssfsSecretInterceptor struct { + credFileKey string +} + +var _ mounter.MountInterceptor = OssfsSecretInterceptor{} + +func NewOssfsSecretInterceptor() mounter.MountInterceptor { + return OssfsSecretInterceptor{ + credFileKey: "passwd_file", + } +} + +func NewOssfs2SecretInterceptor() mounter.MountInterceptor { + return OssfsSecretInterceptor{ + credFileKey: "-c", + } +} + +func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation) (*mounter.MountOperation, error) { + filePath, err := utils.SaveOssSecretsToFile(req.Secrets, req.FsType) + if err != nil { + return req, err + } + if filePath != "" { + req.Options = append(req.Options, i.credFileKey+"="+filePath) + } + return req, nil +} diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 3596abf8f..8be75f600 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -1,15 +1,49 @@ package mounter import ( + "context" + mountutils "k8s.io/mount-utils" ) -type ExtendedMountParams struct { +type Mounter interface { + mountutils.Interface + ExtendedMount(ctx context.Context, op *MountOperation) error +} + +type MountOperation struct { + Source string + Target string + FsType string + Options []string Secrets map[string]string MetricsPath string + VolumeID string } -type Mounter interface { - mountutils.Interface - ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error +type MountInterceptor interface { + BeforeMount(op *MountOperation) (*MountOperation, error) +} + +type MountWorkflow struct { + Mounter + interceptors []MountInterceptor +} + +var _ Mounter = &MountWorkflow{} + +func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) (err error) { + for _, interceptor := range w.interceptors { + if op, err = interceptor.BeforeMount(op); err != nil { + return err + } + } + return w.Mounter.ExtendedMount(ctx, op) +} + +func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter { + return &MountWorkflow{ + Mounter: m, + interceptors: interceptors, + } } diff --git a/pkg/mounter/proxy/protocol.go b/pkg/mounter/proxy/protocol.go index 5d94a4495..9585334f8 100644 --- a/pkg/mounter/proxy/protocol.go +++ b/pkg/mounter/proxy/protocol.go @@ -43,4 +43,5 @@ type MountRequest struct { MountFlags []string `json:"mountFlags,omitempty"` Secrets map[string]string `json:"secrets,omitempty"` MetricsPath string `json:"metricsPath,omitempty"` + VolumeID string `json:"volumeID,omitempty"` } diff --git a/pkg/mounter/proxy/server/alinas/driver.go b/pkg/mounter/proxy/server/alinas/driver.go index a31e4a64d..608cdc4a7 100644 --- a/pkg/mounter/proxy/server/alinas/driver.go +++ b/pkg/mounter/proxy/server/alinas/driver.go @@ -11,9 +11,11 @@ import ( "syscall" "time" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - mounter "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -25,11 +27,16 @@ const ( ) func init() { - server.RegisterDriver(&Driver{mounter: mount.New("")}) + server.RegisterDriver(&Driver{ + Mounter: mounter.NewForMounter( + &extendedMounter{Interface: mount.New("")}, + interceptors.NewAlinasSecretInterceptor(), + ), + }) } type Driver struct { - mounter mount.Interface + mounter.Mounter } func (h *Driver) Name() string { @@ -41,15 +48,15 @@ func (h *Driver) Fstypes() []string { } func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - klog.InfoS("Mounting", "fstype", req.Fstype, "source", req.Source, "target", req.Target, "options", req.Options) - options := append(req.Options, "no_start_watchdog") - if req.Fstype == fstypeAlinas { - // options = append(options, "no_atomic_move", "auto_fallback_nfs") - options = append(options, "no_atomic_move") - options = addAutoFallbackNFSMountOptions(options) - } - - return h.mounter.Mount(req.Source, req.Target, req.Fstype, options) + return h.ExtendedMount(ctx, &mounter.MountOperation{ + Source: req.Source, + Target: req.Target, + FsType: req.Fstype, + Options: req.Options, + Secrets: req.Secrets, + MetricsPath: req.MetricsPath, + VolumeID: req.VolumeID, + }) } func (h *Driver) Init() { @@ -79,7 +86,7 @@ func addAutoFallbackNFSMountOptions(mountOptions []string) []string { isEFC := false isVSC := false for _, options := range mountOptions { - for _, option := range mounter.SplitMountOptions(options) { + for _, option := range mounterutils.SplitMountOptions(options) { if option == "" { continue } @@ -161,3 +168,20 @@ func copyFile(src, dst string) error { return dstFile.Sync() } + +type extendedMounter struct { + mount.Interface +} + +var _ mounter.Mounter = &extendedMounter{} + +func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) error { + klog.InfoS("Mounting", "fstype", op.FsType, "source", op.Source, "target", op.Target, "options", op.Options) + op.Options = append(op.Options, "no_start_watchdog") + if op.FsType == fstypeAlinas { + // options = append(options, "no_atomic_move", "auto_fallback_nfs") + op.Options = append(op.Options, "no_atomic_move") + op.Options = addAutoFallbackNFSMountOptions(op.Options) + } + return m.Mount(op.Source, op.Target, op.FsType, op.Options) +} diff --git a/pkg/mounter/proxy_mounter.go b/pkg/mounter/proxy_mounter.go index 49a558da0..356c97545 100644 --- a/pkg/mounter/proxy_mounter.go +++ b/pkg/mounter/proxy_mounter.go @@ -1,6 +1,7 @@ package mounter import ( + "context" "errors" "fmt" @@ -14,6 +15,8 @@ type ProxyMounter struct { mountutils.Interface } +var _ Mounter = &ProxyMounter{} + func NewProxyMounter(socketPath string, inner mountutils.Interface) Mounter { return &ProxyMounter{ socketPath: socketPath, @@ -21,20 +24,16 @@ func NewProxyMounter(socketPath string, inner mountutils.Interface) Mounter { } } -func (m *ProxyMounter) ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error { - // Parameters in ExtendedMountParams are optional - if params == nil { - params = &ExtendedMountParams{} - } - +func (m *ProxyMounter) ExtendedMount(ctx context.Context, op *MountOperation) error { dclient := client.NewClient(m.socketPath) resp, err := dclient.Mount(&proxy.MountRequest{ - Source: source, - Target: target, - Fstype: fstype, - Options: options, - Secrets: params.Secrets, - MetricsPath: params.MetricsPath, + Source: op.Source, + Target: op.Target, + Fstype: op.FsType, + Options: op.Options, + Secrets: op.Secrets, + MetricsPath: op.MetricsPath, + VolumeID: op.VolumeID, }) if err != nil { return fmt.Errorf("call mounter daemon: %w", err) @@ -43,7 +42,7 @@ func (m *ProxyMounter) ExtendedMount(source, target, fstype string, options []st if err != nil { return fmt.Errorf("failed to mount: %w", err) } - notMnt, err := m.IsLikelyNotMountPoint(target) + notMnt, err := m.IsLikelyNotMountPoint(op.Target) if err != nil { return err } @@ -54,5 +53,10 @@ func (m *ProxyMounter) ExtendedMount(source, target, fstype string, options []st } func (m *ProxyMounter) Mount(source string, target string, fstype string, options []string) error { - return m.ExtendedMount(source, target, fstype, options, nil) + return m.ExtendedMount(context.Background(), &MountOperation{ + Source: source, + Target: target, + FsType: fstype, + Options: options, + }) } diff --git a/pkg/nas/mounter.go b/pkg/nas/mounter.go index 203be51e7..29f278d49 100644 --- a/pkg/nas/mounter.go +++ b/pkg/nas/mounter.go @@ -32,22 +32,17 @@ func (m *NasMounter) Mount(source string, target string, fstype string, options return err } -func newNasMounter(agentMode bool) mountutils.Interface { +func newNasMounter(agentMode bool, socketPath string) mountutils.Interface { inner := mountutils.NewWithoutSystemd("") m := &NasMounter{ Interface: inner, alinasMounter: inner, } - if !agentMode { + switch { + case socketPath != "": + m.alinasMounter = mounter.NewProxyMounter(socketPath, inner) + case !agentMode: // normal case, use connector mounter to ensure backward compatability m.alinasMounter = mounter.NewConnectorMounter(inner, "") } return m } - -func newNasMounterWithProxy(socketPath string) mountutils.Interface { - inner := mountutils.NewWithoutSystemd("") - return &NasMounter{ - Interface: inner, - alinasMounter: mounter.NewProxyMounter(socketPath, inner), - } -} diff --git a/pkg/nas/mounter_test.go b/pkg/nas/mounter_test.go index b299085b3..65860d7ad 100644 --- a/pkg/nas/mounter_test.go +++ b/pkg/nas/mounter_test.go @@ -25,7 +25,7 @@ func (m *errorMockMounter) Mount(source string, target string, fstype string, op } func TestNewNasMounter(t *testing.T) { - actual := newNasMounter(true) + actual := newNasMounter(true, "") assert.NotNil(t, actual) } diff --git a/pkg/nas/nodeserver.go b/pkg/nas/nodeserver.go index 2623f1f7b..5db299284 100644 --- a/pkg/nas/nodeserver.go +++ b/pkg/nas/nodeserver.go @@ -68,15 +68,10 @@ func newNodeServer(config *internal.NodeConfig) *nodeServer { GenericNodeServer: common.GenericNodeServer{ NodeID: config.NodeName, }, + mounter: newNasMounter(config.AgentMode, config.MountProxySocket), } - if config.MountProxySocket == "" { - if !ns.config.AgentMode { - ns.recorder = utils.NewEventRecorder() - } - ns.mounter = newNasMounter(ns.config.AgentMode) - } else { - ns.recorder = utils.NewEventRecorder() - ns.mounter = newNasMounterWithProxy(config.MountProxySocket) + if !ns.config.AgentMode { + ns.recorder = utils.NewEventRecorder() // There is no kubeconfig under agent mode } return ns } diff --git a/pkg/oss/nodeserver.go b/pkg/oss/nodeserver.go index 8a4029c41..11e410310 100644 --- a/pkg/oss/nodeserver.go +++ b/pkg/oss/nodeserver.go @@ -26,6 +26,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/oss" mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" @@ -66,6 +67,11 @@ const ( // use unifiedFsType instead var unifiedFsType = OssFsType +var ossInterceptors = map[string][]mounter.MountInterceptor{ + OssFsType: {interceptors.NewOssfsSecretInterceptor()}, + OssFs2Type: {interceptors.NewOssfs2SecretInterceptor()}, +} + func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { return &csi.NodeGetCapabilitiesResponse{Capabilities: []*csi.NodeServiceCapability{ { @@ -161,7 +167,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - ossfsMounter = mounter.NewOssCmdMounter(ns.ossfsPaths[opts.FuseType], req.VolumeId, ns.rawMounter) + ossfsMounter = mounter.NewForMounter( + mounter.NewOssCmdMounter(ns.ossfsPaths[opts.FuseType], req.VolumeId, ns.rawMounter), + ossInterceptors[opts.FuseType]..., + ) } else { ossfsMounter = mounter.NewProxyMounter(socketPath, ns.rawMounter) } @@ -169,12 +178,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // When work as csi-agent, directly mount on the target path. if ns.skipAttach { metricsPath := utils.WriteMetricsInfo(metricsPathPrefix, req, opts.MetricsTop, opts.FuseType, "oss", opts.Bucket) - err := ossfsMounter.ExtendedMount( - mountSource, targetPath, opts.FuseType, - mountOptions, &mounter.ExtendedMountParams{ - Secrets: authCfg.Secrets, - MetricsPath: metricsPath, - }) + err := ossfsMounter.ExtendedMount(ctx, &mounter.MountOperation{ + Source: mountSource, + Target: targetPath, + FsType: opts.FuseType, + Options: mountOptions, + Secrets: authCfg.Secrets, + MetricsPath: metricsPath, + }) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -191,12 +202,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if notMnt { metricsPath := utils.WriteSharedMetricsInfo(metricsPathPrefix, req, opts.FuseType, "oss", opts.Bucket, attachPath) - err := ossfsMounter.ExtendedMount( - mountSource, attachPath, opts.FuseType, - mountOptions, &mounter.ExtendedMountParams{ - Secrets: authCfg.Secrets, - MetricsPath: metricsPath, - }) + err := ossfsMounter.ExtendedMount(ctx, &mounter.MountOperation{ + Source: mountSource, + Target: attachPath, + FsType: opts.FuseType, + Options: mountOptions, + Secrets: authCfg.Secrets, + MetricsPath: metricsPath, + }) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } From 33a21356f2cda87bb38a65faa1b058d0427c0a1f Mon Sep 17 00:00:00 2001 From: iltyty Date: Fri, 14 Nov 2025 17:41:34 +0800 Subject: [PATCH 2/6] feat: support NAS access point ram authentication --- pkg/mounter/interceptors/alinas_secret.go | 35 +++++++++++++++- pkg/nas/mounter.go | 24 ++++++----- pkg/nas/mounter_test.go | 8 +++- pkg/nas/nodeserver.go | 8 +++- pkg/nas/utils.go | 49 +++++++++++++++++++---- 5 files changed, 100 insertions(+), 24 deletions(-) diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go index 75879ae0d..7ad4331a5 100644 --- a/pkg/mounter/interceptors/alinas_secret.go +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -1,7 +1,12 @@ package interceptors import ( + "fmt" + "os" + "path" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "k8s.io/klog/v2" ) type AlinasSecretInterceptor struct{} @@ -12,6 +17,32 @@ func NewAlinasSecretInterceptor() mounter.MountInterceptor { return AlinasSecretInterceptor{} } -func (AlinasSecretInterceptor) BeforeMount(req *mounter.MountOperation) (*mounter.MountOperation, error) { - return req, nil +func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation) (*mounter.MountOperation, error) { + if op == nil || op.Secrets == nil { + return op, nil + } + + tmpDir, err := os.MkdirTemp("", "alinas-") + if err != nil { + return op, err + } + + credFileContent := makeCredFileContent(op.Secrets) + credFilePath := path.Join(tmpDir, op.VolumeID+".credentials") + if err = os.WriteFile(credFilePath, credFileContent, 0o600); err != nil { + return op, err + } + + klog.V(4).InfoS("Created alinas credential file", "path", credFilePath) + op.Options = append(op.Options, "ram_config_file="+credFilePath) + return op, nil +} + +func makeCredFileContent(secrets map[string]string) []byte { + return fmt.Appendf( + nil, + "[NASCredentials]\naccessKeyID=%s\naccessKeySecret=%s", + secrets["akId"], + secrets["akSecret"], + ) } diff --git a/pkg/nas/mounter.go b/pkg/nas/mounter.go index 29f278d49..1c5d46ce2 100644 --- a/pkg/nas/mounter.go +++ b/pkg/nas/mounter.go @@ -1,6 +1,8 @@ package nas import ( + "context" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "k8s.io/klog/v2" mountutils "k8s.io/mount-utils" @@ -11,18 +13,20 @@ type NasMounter struct { alinasMounter mountutils.Interface } -func (m *NasMounter) Mount(source string, target string, fstype string, options []string) (err error) { +var _ mounter.Mounter = &NasMounter{} + +func (m *NasMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) (err error) { logger := klog.Background().WithValues( - "source", source, - "target", target, - "options", options, - "fstype", fstype, + "source", op.Source, + "target", op.Target, + "options", op.Options, + "fstype", op.FsType, ) - switch fstype { + switch op.FsType { case "alinas", "cpfs", "cpfs-nfs": - err = m.alinasMounter.Mount(source, target, fstype, options) + err = m.alinasMounter.Mount(op.Source, op.Target, op.FsType, op.Options) default: - err = m.Interface.Mount(source, target, fstype, options) + err = m.Mount(op.Source, op.Target, op.FsType, op.Options) } if err != nil { logger.Error(err, "failed to mount") @@ -32,7 +36,7 @@ func (m *NasMounter) Mount(source string, target string, fstype string, options return err } -func newNasMounter(agentMode bool, socketPath string) mountutils.Interface { +func newNasMounter(agentMode bool, socketPath string) mounter.Mounter { inner := mountutils.NewWithoutSystemd("") m := &NasMounter{ Interface: inner, @@ -41,7 +45,7 @@ func newNasMounter(agentMode bool, socketPath string) mountutils.Interface { switch { case socketPath != "": m.alinasMounter = mounter.NewProxyMounter(socketPath, inner) - case !agentMode: // normal case, use connector mounter to ensure backward compatability + case !agentMode: // normal case, use connector mounter to ensure backward compatibility m.alinasMounter = mounter.NewConnectorMounter(inner, "") } return m diff --git a/pkg/nas/mounter_test.go b/pkg/nas/mounter_test.go index 65860d7ad..e98baf5bf 100644 --- a/pkg/nas/mounter_test.go +++ b/pkg/nas/mounter_test.go @@ -1,9 +1,11 @@ package nas import ( + "context" "errors" "testing" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "github.com/stretchr/testify/assert" mountutils "k8s.io/mount-utils" ) @@ -34,7 +36,7 @@ func TestNasMounter_MountSuccess(t *testing.T) { Interface: &successMockMounter{}, alinasMounter: &successMockMounter{}, } - err := nasMounter.Mount("", "", "nas", []string{}) + err := nasMounter.ExtendedMount(context.Background(), &mounter.MountOperation{}) assert.NoError(t, err) } @@ -43,6 +45,8 @@ func TestNasMounter_FuseMountError(t *testing.T) { Interface: &errorMockMounter{}, alinasMounter: &errorMockMounter{}, } - err := nasMounter.Mount("", "", "cpfs", []string{}) + err := nasMounter.ExtendedMount(context.Background(), &mounter.MountOperation{ + FsType: "cpfs", + }) assert.Error(t, err) } diff --git a/pkg/nas/nodeserver.go b/pkg/nas/nodeserver.go index 5db299284..b095ec867 100644 --- a/pkg/nas/nodeserver.go +++ b/pkg/nas/nodeserver.go @@ -34,6 +34,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/dadi" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/losetup" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/internal" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/rund/directvolume" @@ -42,12 +43,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - mountutils "k8s.io/mount-utils" ) type nodeServer struct { config *internal.NodeConfig - mounter mountutils.Interface + mounter mounter.Mounter locks *utils.VolumeLocks recorder record.EventRecorder common.GenericNodeServer @@ -91,6 +91,8 @@ type Options struct { MountProtocol string `json:"mountProtocol"` ClientType string `json:"clientType"` FSType string `json:"fsType"` + AkID string + AkSecret string } // RunvNasOptions struct definition @@ -246,6 +248,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis opt.MountProtocol = strings.TrimSpace(value) } } + opt.AkID = req.Secrets[akIDKey] + opt.AkSecret = req.Secrets[akSecretKey] if cnfsName != "" { cnfs, err := ns.getCNFS(ctx, req, cnfsName) diff --git a/pkg/nas/utils.go b/pkg/nas/utils.go index b0f639825..0a2a08158 100644 --- a/pkg/nas/utils.go +++ b/pkg/nas/utils.go @@ -17,6 +17,7 @@ limitations under the License. package nas import ( + "context" "errors" "fmt" "os" @@ -27,7 +28,8 @@ import ( "github.com/alibabacloud-go/tea/tea" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/losetup" - mounter "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/nas/interfaces" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" @@ -52,6 +54,8 @@ const ( TcpSlotTableEntries = "/proc/sys/sunrpc/tcp_slot_table_entries" TcpSlotTableEntriesValue = "128\n" + akIDKey = "akId" + akSecretKey = "akSecret" filesystemIDKey = "fileSystemId" filesystemTypeKey = "fileSystemType" ) @@ -66,11 +70,12 @@ type RoleAuth struct { Code string } -func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, podUid string, agentMode bool) error { +func doMount(m mounter.Mounter, opt *Options, targetPath, volumeId, podUid string, agentMode bool) error { var ( mountFstype string source string combinedOptions []string + secrets map[string]string isPathNotFound func(error) bool ) if opt.Accesspoint != "" { @@ -81,6 +86,12 @@ func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, p if opt.Options != "" { combinedOptions = append(combinedOptions, opt.Options) } + if opt.AkID != "" && opt.AkSecret != "" { + secrets = map[string]string{ + akIDKey: opt.AkID, + akSecretKey: opt.AkSecret, + } + } switch opt.ClientType { case EFCClient: @@ -128,7 +139,15 @@ func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, p mountFstype = opt.MountProtocol } } - err := mounter.Mount(source, targetPath, mountFstype, combinedOptions) + + err := m.ExtendedMount(context.Background(), &mounter.MountOperation{ + Source: source, + Target: targetPath, + FsType: mountFstype, + Options: combinedOptions, + Secrets: secrets, + VolumeID: volumeId, + }) if err == nil { return nil } @@ -155,16 +174,30 @@ func doMount(mounter mountutils.Interface, opt *Options, targetPath, volumeId, p return err } defer os.Remove(tmpPath) - if err := mounter.Mount(rootSource, tmpPath, mountFstype, combinedOptions); err != nil { + if err := m.ExtendedMount(context.Background(), &mounter.MountOperation{ + Source: rootSource, + Target: tmpPath, + FsType: mountFstype, + Options: combinedOptions, + Secrets: secrets, + VolumeID: volumeId, + }); err != nil { return err } if err := os.MkdirAll(filepath.Join(tmpPath, relPath), os.ModePerm); err != nil { return err } - if err := cleanupMountpoint(mounter, tmpPath); err != nil { + if err := cleanupMountpoint(m, tmpPath); err != nil { klog.Errorf("failed to cleanup tmp mountpoint %s: %v", tmpPath, err) } - return mounter.Mount(source, targetPath, mountFstype, combinedOptions) + return m.ExtendedMount(context.Background(), &mounter.MountOperation{ + Source: source, + Target: targetPath, + FsType: mountFstype, + Options: combinedOptions, + Secrets: secrets, + VolumeID: volumeId, + }) } func isEFCPathNotFoundError(err error) bool { @@ -199,7 +232,7 @@ func ParseMountFlags(mntOptions []string) (string, string) { var vers string var otherOptions []string for _, options := range mntOptions { - for _, option := range mounter.SplitMountOptions(options) { + for _, option := range mounterutils.SplitMountOptions(options) { if option == "" { continue } @@ -219,7 +252,7 @@ func ParseMountFlags(mntOptions []string) (string, string) { func addTLSMountOptions(baseOptions []string) []string { for _, options := range baseOptions { - for _, option := range mounter.SplitMountOptions(options) { + for _, option := range mounterutils.SplitMountOptions(options) { if option == "" { continue } From 76dc555bd02c809b2cfcc2e4eadd1689f866c9e0 Mon Sep 17 00:00:00 2001 From: iltyty Date: Fri, 21 Nov 2025 11:19:01 +0800 Subject: [PATCH 3/6] refactor(mounter): use ossfs secret and monitor interceptors in ossfs mount proxy servers --- pkg/mounter/cmd_mounter.go | 1 + pkg/mounter/interceptors/alinas_secret.go | 6 +- pkg/mounter/interceptors/ossfs2_secret.go | 41 ++++++ pkg/mounter/interceptors/ossfs_monitor.go | 80 ++++++++++++ pkg/mounter/interceptors/ossfs_secret.go | 34 ++--- pkg/mounter/mounter.go | 32 ++++- pkg/mounter/proxy/server/metrics.go | 5 +- pkg/mounter/proxy/server/mount_result.go | 6 + pkg/mounter/proxy/server/ossfs/driver.go | 139 ++++++++++----------- pkg/mounter/proxy/server/ossfs2/driver.go | 144 ++++++++++------------ pkg/oss/nodeserver.go | 2 + 11 files changed, 318 insertions(+), 172 deletions(-) create mode 100644 pkg/mounter/interceptors/ossfs2_secret.go create mode 100644 pkg/mounter/interceptors/ossfs_monitor.go create mode 100644 pkg/mounter/proxy/server/mount_result.go diff --git a/pkg/mounter/cmd_mounter.go b/pkg/mounter/cmd_mounter.go index da3dc6731..3f8fb1eef 100644 --- a/pkg/mounter/cmd_mounter.go +++ b/pkg/mounter/cmd_mounter.go @@ -51,6 +51,7 @@ func getArgs(op *MountOperation) []string { return mount.MakeMountArgs(op.Source, op.Target, "", op.Options) case "ossfs2": args := []string{"mount", op.Target} + args = append(args, op.Args...) for _, o := range op.Options { args = append(args, fmt.Sprintf("--%s", o)) } diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go index 7ad4331a5..57b3a04a6 100644 --- a/pkg/mounter/interceptors/alinas_secret.go +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -17,7 +17,7 @@ func NewAlinasSecretInterceptor() mounter.MountInterceptor { return AlinasSecretInterceptor{} } -func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation) (*mounter.MountOperation, error) { +func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation, _ error) (*mounter.MountOperation, error) { if op == nil || op.Secrets == nil { return op, nil } @@ -46,3 +46,7 @@ func makeCredFileContent(secrets map[string]string) []byte { secrets["akSecret"], ) } + +func (AlinasSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error { + return nil +} diff --git a/pkg/mounter/interceptors/ossfs2_secret.go b/pkg/mounter/interceptors/ossfs2_secret.go new file mode 100644 index 000000000..41d068013 --- /dev/null +++ b/pkg/mounter/interceptors/ossfs2_secret.go @@ -0,0 +1,41 @@ +package interceptors + +import ( + "fmt" + "os" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" +) + +type Ossfs2SecretInterceptor struct { + passwdFile string +} + +var _ mounter.MountInterceptor = Ossfs2SecretInterceptor{} + +func NewOssfs2SecretInterceptor() mounter.MountInterceptor { + return Ossfs2SecretInterceptor{} +} + +func (i Ossfs2SecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) { + var err error + i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType) + if err != nil { + return req, err + } + if i.passwdFile != "" { + req.Args = append(req.Args, []string{"-c", i.passwdFile}...) + } + return req, nil +} + +func (i Ossfs2SecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error { + if i.passwdFile == "" { + return nil + } + if err := os.Remove(i.passwdFile); err != nil { + return fmt.Errorf("error removing configuration file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile) + } + return nil +} diff --git a/pkg/mounter/interceptors/ossfs_monitor.go b/pkg/mounter/interceptors/ossfs_monitor.go new file mode 100644 index 000000000..482277e50 --- /dev/null +++ b/pkg/mounter/interceptors/ossfs_monitor.go @@ -0,0 +1,80 @@ +package interceptors + +import ( + "errors" + "fmt" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" + "k8s.io/mount-utils" +) + +type OssfsMonitorInterceptor struct { + raw mount.Interface + monitorManager *server.MountMonitorManager +} + +var _ mounter.MountInterceptor = OssfsMonitorInterceptor{} + +func NewOssfsMonitorInterceptor() mounter.MountInterceptor { + return OssfsMonitorInterceptor{ + raw: mount.NewWithoutSystemd(""), + monitorManager: server.NewMountMonitorManager(), + } +} + +func (i OssfsMonitorInterceptor) BeforeMount(req *mounter.MountOperation, err error) (*mounter.MountOperation, error) { + if req.MetricsPath == "" { + return req, nil + } + // Get or create monitor for this target + monitor, found := i.monitorManager.GetMountMonitor(req.Target, req.MetricsPath, i.raw, true) + if monitor == nil { + return req, fmt.Errorf("failed to get mount monitor for %s, stop monitoring mountpoint status", req.Target) + } + if found { + monitor.IncreaseMountRetryCount() + } + if err != nil { + monitor.HandleMountFailureOrExit(err) + } + return req, nil +} + +func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err error) error { + if op.MetricsPath == "" { + return nil + } + monitor, _ := i.monitorManager.GetMountMonitor(op.Target, op.MetricsPath, i.raw, true) + if monitor == nil { + return fmt.Errorf("failed to get mount monitor for %s", op.Target) + } + + // Immediate process-exit handling during mount attempt + // Assume the process exits with no error upon receiving SIGTERM, + // and exits with an error in case of unexpected failures. + monitor.HandleMountFailureOrExit(err) + + if op.MountResult == nil { + return nil + } + + res, ok := op.MountResult.(server.OssfsMountResult) + if !ok { + return errors.New("failed to parse ossfs mount result") + } + + go func() { + err := <-res.ExitChan + monitor.HandleMountFailureOrExit(err) + }() + + if err != nil { + return nil + } + + monitor.HandleMountSuccess(res.PID) + // Start monitoring goroutine (ticker based only) + i.monitorManager.StartMonitoring(op.Target) + return nil +} diff --git a/pkg/mounter/interceptors/ossfs_secret.go b/pkg/mounter/interceptors/ossfs_secret.go index 8c7906d5f..daebc7580 100644 --- a/pkg/mounter/interceptors/ossfs_secret.go +++ b/pkg/mounter/interceptors/ossfs_secret.go @@ -1,35 +1,41 @@ package interceptors import ( + "fmt" + "os" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" ) type OssfsSecretInterceptor struct { - credFileKey string + passwdFile string } var _ mounter.MountInterceptor = OssfsSecretInterceptor{} func NewOssfsSecretInterceptor() mounter.MountInterceptor { - return OssfsSecretInterceptor{ - credFileKey: "passwd_file", - } -} - -func NewOssfs2SecretInterceptor() mounter.MountInterceptor { - return OssfsSecretInterceptor{ - credFileKey: "-c", - } + return OssfsSecretInterceptor{} } -func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation) (*mounter.MountOperation, error) { - filePath, err := utils.SaveOssSecretsToFile(req.Secrets, req.FsType) +func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) { + var err error + i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType) if err != nil { return req, err } - if filePath != "" { - req.Options = append(req.Options, i.credFileKey+"="+filePath) + if i.passwdFile != "" { + req.Options = append(req.Options, "passwd_file="+i.passwdFile) } return req, nil } + +func (i OssfsSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error { + if i.passwdFile == "" { + return nil + } + if err := os.Remove(i.passwdFile); err != nil { + return fmt.Errorf("error removing passwd file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile) + } + return nil +} diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 8be75f600..8dc385c48 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -2,7 +2,9 @@ package mounter import ( "context" + "fmt" + "k8s.io/klog/v2" mountutils "k8s.io/mount-utils" ) @@ -16,13 +18,17 @@ type MountOperation struct { Target string FsType string Options []string + Args []string Secrets map[string]string MetricsPath string VolumeID string + + MountResult any } type MountInterceptor interface { - BeforeMount(op *MountOperation) (*MountOperation, error) + BeforeMount(op *MountOperation, err error) (*MountOperation, error) + AfterMount(op *MountOperation, err error) error } type MountWorkflow struct { @@ -32,13 +38,29 @@ type MountWorkflow struct { var _ Mounter = &MountWorkflow{} -func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) (err error) { +func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) error { + var err error + for i, interceptor := range w.interceptors { + if op, err = interceptor.BeforeMount(op, err); err != nil && i != len(w.interceptors)-1 { + // Log error but continue to the next interceptor, since some interceptors may + // want to handle the error, e.g. the OssMonitorInterceptor. + // Only log for the intermediate interceptors, otherwise the final error will be printed twice. + klog.ErrorS(err, "error executing BeforeMount interceptor") + } + } + if err != nil { + return fmt.Errorf("error executing BeforeMount interceptor: %w", err) + } + + err = w.Mounter.ExtendedMount(ctx, op) for _, interceptor := range w.interceptors { - if op, err = interceptor.BeforeMount(op); err != nil { - return err + if afterErr := interceptor.AfterMount(op, err); afterErr != nil { + // Log error but continue to the next interceptor. + // Do not override the original error from mount operation. + klog.ErrorS(afterErr, "error executing AfterMount interceptor") } } - return w.Mounter.ExtendedMount(ctx, op) + return err } func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter { diff --git a/pkg/mounter/proxy/server/metrics.go b/pkg/mounter/proxy/server/metrics.go index 0a9523f3f..725c13ff5 100644 --- a/pkg/mounter/proxy/server/metrics.go +++ b/pkg/mounter/proxy/server/metrics.go @@ -3,7 +3,6 @@ package server import ( "fmt" "os" - "os/exec" "path/filepath" "strconv" "sync" @@ -125,15 +124,15 @@ func (m *MountMonitor) HandleMountFailureOrExit(err error) { } // HandleMountSuccess handles the case when mount operation succeeds -func (m *MountMonitor) HandleMountSuccess(process *exec.Cmd) { +func (m *MountMonitor) HandleMountSuccess(pid int) { m.mu.Lock() defer m.mu.Unlock() // Update metrics for mount success m.updateMountPointMetrics(&m.retryCount, nil, nil) + m.Pid = pid m.State = MonitorStateMonitoring - m.Pid = process.Process.Pid klog.InfoS("Mount succeeded", "target", m.Target, "pid", m.Pid) } diff --git a/pkg/mounter/proxy/server/mount_result.go b/pkg/mounter/proxy/server/mount_result.go new file mode 100644 index 000000000..538722182 --- /dev/null +++ b/pkg/mounter/proxy/server/mount_result.go @@ -0,0 +1,6 @@ +package server + +type OssfsMountResult struct { + PID int + ExitChan chan error +} diff --git a/pkg/mounter/proxy/server/ossfs/driver.go b/pkg/mounter/proxy/server/ossfs/driver.go index 31812a552..d9e91fb48 100644 --- a/pkg/mounter/proxy/server/ossfs/driver.go +++ b/pkg/mounter/proxy/server/ossfs/driver.go @@ -14,10 +14,11 @@ import ( "golang.org/x/sys/unix" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" serverutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -29,18 +30,27 @@ func init() { // Driver manages ossfs mounts and their monitoring type Driver struct { + mounter.Mounter pids *sync.Map monitorManager *server.MountMonitorManager wg sync.WaitGroup - raw mount.Interface } func NewDriver() *Driver { - return &Driver{ + driver := &Driver{ pids: new(sync.Map), monitorManager: server.NewMountMonitorManager(), - raw: mount.NewWithoutSystemd(""), } + m := &extendedMounter{ + driver: driver, + Interface: mount.NewWithoutSystemd(""), + } + driver.Mounter = mounter.NewForMounter( + m, + interceptors.NewOssfsSecretInterceptor(), + interceptors.NewOssfsMonitorInterceptor(), + ) + return driver } func (h *Driver) Name() string { @@ -52,33 +62,51 @@ func (h *Driver) Fstypes() []string { } func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - options := req.Options - target := req.Target - - // Get or create monitor for this target - var monitor *server.MountMonitor - if req.MetricsPath != "" { - var found bool - monitor, found = h.monitorManager.GetMountMonitor(target, req.MetricsPath, h.raw, true) - if monitor == nil { - klog.Errorf("Failed to get mount monitor for %s, stop monitoring mountpoint status", target) - } else if found { - monitor.IncreaseMountRetryCount() - } - } + return h.ExtendedMount(ctx, &mounter.MountOperation{ + Source: req.Source, + Target: req.Target, + FsType: req.Fstype, + Options: req.Options, + Secrets: req.Secrets, + MetricsPath: req.MetricsPath, + VolumeID: req.VolumeID, + }) +} - // prepare passwd file - passwdFile, err := utils.SaveOssSecretsToFile(req.Secrets, req.Fstype) - if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(err) +func (h *Driver) Init() {} + +func (h *Driver) Terminate() { + // Stop all mount monitoring + h.monitorManager.StopAllMonitoring() + + // terminate all running ossfs + h.pids.Range(func(key, value any) bool { + err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) + if err != nil { + klog.ErrorS(err, "Failed to terminate ossfs", "pid", key) } - return err - } - options = append(options, "passwd_file="+passwdFile) + klog.V(4).InfoS("Sended sigterm", "pid", key) + return true + }) + + // wait all ossfs processes and monitoring goroutines to exit + h.monitorManager.WaitForAllMonitoring() + h.wg.Wait() + klog.InfoS("All ossfs processes and monitoring goroutines exited") +} + +type extendedMounter struct { + driver *Driver + mount.Interface +} + +var _ mounter.Mounter = &extendedMounter{} - args := mount.MakeMountArgs(req.Source, req.Target, "", options) - args = append(args, req.MountFlags...) +func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) error { + options := op.Options + target := op.Target + + args := mount.MakeMountArgs(op.Source, op.Target, "", options) args = append(args, "-f") var stderrBuf bytes.Buffer @@ -91,12 +119,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { sw.SwitchTarget(os.Stderr) }() - err = cmd.Start() + err := cmd.Start() if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(fmt.Errorf("start ossfs failed: %w", err)) - } - return err + return fmt.Errorf("start ossfs failed: %w", err) } pid := cmd.Process.Pid @@ -111,11 +136,11 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Wait for mount to complete ossfsExited := make(chan error, 1) - h.wg.Add(1) - h.pids.Store(pid, cmd) + m.driver.wg.Add(1) + m.driver.pids.Store(pid, cmd) go func() { - defer h.wg.Done() - defer h.pids.Delete(pid) + defer m.driver.wg.Done() + defer m.driver.pids.Delete(pid) err := cmd.Wait() if err != nil { @@ -129,17 +154,8 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } else { klog.InfoS("ossfs exited", "mountpoint", target, "pid", pid) } - // Immediate process-exit handling during mount attempt - // Assume the process exits with no error upon receiving SIGTERM, - // and exits with an error in case of unexpected failures. - if monitor != nil { - monitor.HandleMountFailureOrExit(err) - } // Notify poll loop after metrics are updated ossfsExited <- err - if err := os.Remove(passwdFile); err != nil { - klog.ErrorS(err, "Remove passwd file", "mountpoint", target, "path", passwdFile) - } close(ossfsExited) }() @@ -151,7 +167,7 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } return false, fmt.Errorf("ossfs exited") default: - notMnt, err := h.raw.IsLikelyNotMountPoint(target) + notMnt, err := m.IsLikelyNotMountPoint(target) if err != nil { klog.ErrorS(err, "check mountpoint", "mountpoint", target) return false, nil @@ -165,10 +181,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { }) if err == nil { - if monitor != nil { - monitor.HandleMountSuccess(cmd) - // Start monitoring goroutine (ticker based only) - h.monitorManager.StartMonitoring(target) + op.MountResult = server.OssfsMountResult{ + PID: pid, + ExitChan: ossfsExited, } return nil } @@ -192,25 +207,3 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Just return the error to caller to avoid double counting. return err } - -func (h *Driver) Init() {} - -func (h *Driver) Terminate() { - // Stop all mount monitoring - h.monitorManager.StopAllMonitoring() - - // terminate all running ossfs - h.pids.Range(func(key, value any) bool { - err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) - if err != nil { - klog.ErrorS(err, "Failed to terminate ossfs", "pid", key) - } - klog.V(4).InfoS("Sended sigterm", "pid", key) - return true - }) - - // wait all ossfs processes and monitoring goroutines to exit - h.monitorManager.WaitForAllMonitoring() - h.wg.Wait() - klog.InfoS("All ossfs processes and monitoring goroutines exited") -} diff --git a/pkg/mounter/proxy/server/ossfs2/driver.go b/pkg/mounter/proxy/server/ossfs2/driver.go index a879bb90f..990aeafaa 100644 --- a/pkg/mounter/proxy/server/ossfs2/driver.go +++ b/pkg/mounter/proxy/server/ossfs2/driver.go @@ -14,10 +14,10 @@ import ( "golang.org/x/sys/unix" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - serverutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -28,18 +28,27 @@ func init() { } type Driver struct { + mounter.Mounter pids *sync.Map monitorManager *server.MountMonitorManager wg sync.WaitGroup - raw mount.Interface } func NewDriver() *Driver { - return &Driver{ + driver := &Driver{ pids: new(sync.Map), monitorManager: server.NewMountMonitorManager(), - raw: mount.NewWithoutSystemd(""), } + m := &extendedMounter{ + driver: driver, + Interface: mount.NewWithoutSystemd(""), + } + driver.Mounter = mounter.NewForMounter( + m, + interceptors.NewOssfs2SecretInterceptor(), + interceptors.NewOssfsMonitorInterceptor(), + ) + return driver } func (h *Driver) Name() string { @@ -51,36 +60,54 @@ func (h *Driver) Fstypes() []string { } func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - options := req.Options - target := req.Target - - // Get or create monitor for this target - var monitor *server.MountMonitor - if req.MetricsPath != "" { - var found bool - monitor, found = h.monitorManager.GetMountMonitor(target, req.MetricsPath, h.raw, true) - if monitor == nil { - klog.Errorf("Failed to get mount monitor for %s, stop monitoring mountpoint status", target) - } else if found { - monitor.IncreaseMountRetryCount() - } - } + return h.ExtendedMount(ctx, &mounter.MountOperation{ + Source: req.Source, + Target: req.Target, + FsType: req.Fstype, + Options: req.Options, + Secrets: req.Secrets, + MetricsPath: req.MetricsPath, + VolumeID: req.VolumeID, + }) +} - // prepare passwd file - passwdFile, err := utils.SaveOssSecretsToFile(req.Secrets, req.Fstype) - if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(err) +func (h *Driver) Init() {} + +func (h *Driver) Terminate() { + // Stop all mount monitoring + h.monitorManager.StopAllMonitoring() + + // terminate all running ossfs2 + h.pids.Range(func(key, value any) bool { + err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) + if err != nil { + klog.ErrorS(err, "Failed to terminate ossfs2", "pid", key) } - return err - } + klog.V(4).InfoS("Sended sigterm", "pid", key) + return true + }) + + // wait all ossfs2 processes and monitoring goroutines to exit + h.monitorManager.WaitForAllMonitoring() + h.wg.Wait() + klog.InfoS("All ossfs2 processes and monitoring goroutines exited") +} + +type extendedMounter struct { + driver *Driver + mount.Interface +} - args := []string{"mount", req.Target} +var _ mounter.Mounter = &extendedMounter{} + +func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOperation) error { + options := op.Options + target := op.Target + + args := []string{"mount", op.Target} // ossfs2.0 forbid to use FUSE args // args = append(args, req.MountFlags...) - if passwdFile != "" { - args = append(args, []string{"-c", passwdFile}...) - } + args = append(args, op.Args...) for _, o := range options { args = append(args, fmt.Sprintf("--%s", o)) } @@ -88,7 +115,7 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { var stderrBuf bytes.Buffer multiWriter := io.MultiWriter(os.Stderr, &stderrBuf) - sw := serverutils.NewSwitchableWriter(multiWriter) + sw := server.NewSwitchableWriter(multiWriter) cmd := exec.Command("ossfs2", args...) cmd.Stdout = os.Stdout cmd.Stderr = sw @@ -96,12 +123,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { sw.SwitchTarget(os.Stderr) }() - err = cmd.Start() + err := cmd.Start() if err != nil { - if monitor != nil { - monitor.HandleMountFailureOrExit(fmt.Errorf("start ossfs2 failed: %w", err)) - } - return err + return fmt.Errorf("start ossfs2 failed: %w", err) } pid := cmd.Process.Pid @@ -116,11 +140,11 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Wait for mount to complete ossfsExited := make(chan error, 1) - h.wg.Add(1) - h.pids.Store(pid, cmd) + m.driver.wg.Add(1) + m.driver.pids.Store(pid, cmd) go func() { - defer h.wg.Done() - defer h.pids.Delete(pid) + defer m.driver.wg.Done() + defer m.driver.pids.Delete(pid) err := cmd.Wait() if err != nil { @@ -134,17 +158,8 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } else { klog.InfoS("ossfs2 exited", "mountpoint", target, "pid", pid) } - // Immediate process-exit handling during mount attempt - // Assume the process exits with no error upon receiving SIGTERM, - // and exits with an error in case of unexpected failures. - if monitor != nil { - monitor.HandleMountFailureOrExit(err) - } // Notify poll loop after metrics are updated ossfsExited <- err - if err := os.Remove(passwdFile); err != nil { - klog.ErrorS(err, "Remove configuration file", "mountpoint", target, "path", passwdFile) - } close(ossfsExited) }() @@ -156,7 +171,7 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { } return false, fmt.Errorf("ossfs2 exited") default: - notMnt, err := h.raw.IsLikelyNotMountPoint(target) + notMnt, err := m.IsLikelyNotMountPoint(target) if err != nil { klog.ErrorS(err, "check mountpoint", "mountpoint", target) return false, nil @@ -170,10 +185,9 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { }) if err == nil { - if monitor != nil { - monitor.HandleMountSuccess(cmd) - // Start monitoring goroutine (ticker based only) - h.monitorManager.StartMonitoring(target) + op.MountResult = server.OssfsMountResult{ + PID: pid, + ExitChan: ossfsExited, } return nil } @@ -197,25 +211,3 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // Just return the error to caller to avoid double counting. return err } - -func (h *Driver) Init() {} - -func (h *Driver) Terminate() { - // Stop all mount monitoring - h.monitorManager.StopAllMonitoring() - - // terminate all running ossfs2 - h.pids.Range(func(key, value any) bool { - err := value.(*exec.Cmd).Process.Signal(syscall.SIGTERM) - if err != nil { - klog.ErrorS(err, "Failed to terminate ossfs2", "pid", key) - } - klog.V(4).InfoS("Sended sigterm", "pid", key) - return true - }) - - // wait all ossfs2 processes and monitoring goroutines to exit - h.monitorManager.WaitForAllMonitoring() - h.wg.Wait() - klog.InfoS("All ossfs2 processes and monitoring goroutines exited") -} diff --git a/pkg/oss/nodeserver.go b/pkg/oss/nodeserver.go index 11e410310..40d82e245 100644 --- a/pkg/oss/nodeserver.go +++ b/pkg/oss/nodeserver.go @@ -67,6 +67,8 @@ const ( // use unifiedFsType instead var unifiedFsType = OssFsType +// Used by OssCmdMounter only, since ProxyMounter is only the client side, +// the interceptors are applied at the server side. var ossInterceptors = map[string][]mounter.MountInterceptor{ OssFsType: {interceptors.NewOssfsSecretInterceptor()}, OssFs2Type: {interceptors.NewOssfs2SecretInterceptor()}, From e6cc69c61673b274dc8ed80519c4db230abc1d20 Mon Sep 17 00:00:00 2001 From: iltyty Date: Wed, 26 Nov 2025 17:42:36 +0800 Subject: [PATCH 4/6] refactor(mounter): reimplement interceptors following gRPC pattern --- pkg/mounter/interceptors/alinas_secret.go | 25 ++++------ pkg/mounter/interceptors/ossfs2_secret.go | 41 ---------------- pkg/mounter/interceptors/ossfs_monitor.go | 53 ++++++++------------- pkg/mounter/interceptors/ossfs_secret.go | 57 +++++++++++++++-------- pkg/mounter/mounter.go | 52 ++++++++++----------- pkg/mounter/proxy/server/alinas/driver.go | 2 +- pkg/mounter/proxy/server/ossfs/driver.go | 8 ++-- pkg/mounter/proxy/server/ossfs2/driver.go | 4 +- pkg/oss/nodeserver.go | 4 +- 9 files changed, 100 insertions(+), 146 deletions(-) delete mode 100644 pkg/mounter/interceptors/ossfs2_secret.go diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go index 57b3a04a6..3666a9bdd 100644 --- a/pkg/mounter/interceptors/alinas_secret.go +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -1,6 +1,7 @@ package interceptors import ( + "context" "fmt" "os" "path" @@ -9,33 +10,29 @@ import ( "k8s.io/klog/v2" ) -type AlinasSecretInterceptor struct{} +var _ mounter.MountInterceptor = AlinasSecretInterceptor -var _ mounter.MountInterceptor = AlinasSecretInterceptor{} - -func NewAlinasSecretInterceptor() mounter.MountInterceptor { - return AlinasSecretInterceptor{} -} - -func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation, _ error) (*mounter.MountOperation, error) { +func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { if op == nil || op.Secrets == nil { - return op, nil + return handler(ctx, op) } tmpDir, err := os.MkdirTemp("", "alinas-") if err != nil { - return op, err + return err } credFileContent := makeCredFileContent(op.Secrets) credFilePath := path.Join(tmpDir, op.VolumeID+".credentials") if err = os.WriteFile(credFilePath, credFileContent, 0o600); err != nil { - return op, err + os.RemoveAll(tmpDir) // cleanup in case of error + return err } klog.V(4).InfoS("Created alinas credential file", "path", credFilePath) op.Options = append(op.Options, "ram_config_file="+credFilePath) - return op, nil + + return handler(ctx, op) } func makeCredFileContent(secrets map[string]string) []byte { @@ -46,7 +43,3 @@ func makeCredFileContent(secrets map[string]string) []byte { secrets["akSecret"], ) } - -func (AlinasSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error { - return nil -} diff --git a/pkg/mounter/interceptors/ossfs2_secret.go b/pkg/mounter/interceptors/ossfs2_secret.go deleted file mode 100644 index 41d068013..000000000 --- a/pkg/mounter/interceptors/ossfs2_secret.go +++ /dev/null @@ -1,41 +0,0 @@ -package interceptors - -import ( - "fmt" - "os" - - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" -) - -type Ossfs2SecretInterceptor struct { - passwdFile string -} - -var _ mounter.MountInterceptor = Ossfs2SecretInterceptor{} - -func NewOssfs2SecretInterceptor() mounter.MountInterceptor { - return Ossfs2SecretInterceptor{} -} - -func (i Ossfs2SecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) { - var err error - i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType) - if err != nil { - return req, err - } - if i.passwdFile != "" { - req.Args = append(req.Args, []string{"-c", i.passwdFile}...) - } - return req, nil -} - -func (i Ossfs2SecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error { - if i.passwdFile == "" { - return nil - } - if err := os.Remove(i.passwdFile); err != nil { - return fmt.Errorf("error removing configuration file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile) - } - return nil -} diff --git a/pkg/mounter/interceptors/ossfs_monitor.go b/pkg/mounter/interceptors/ossfs_monitor.go index 482277e50..50e2743ac 100644 --- a/pkg/mounter/interceptors/ossfs_monitor.go +++ b/pkg/mounter/interceptors/ossfs_monitor.go @@ -1,54 +1,38 @@ package interceptors import ( + "context" "errors" - "fmt" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" + "k8s.io/klog/v2" "k8s.io/mount-utils" ) -type OssfsMonitorInterceptor struct { - raw mount.Interface - monitorManager *server.MountMonitorManager -} +var _ mounter.MountInterceptor = OssfsMonitorInterceptor -var _ mounter.MountInterceptor = OssfsMonitorInterceptor{} +var ( + raw = mount.NewWithoutSystemd("") + monitorManager = server.NewMountMonitorManager() +) -func NewOssfsMonitorInterceptor() mounter.MountInterceptor { - return OssfsMonitorInterceptor{ - raw: mount.NewWithoutSystemd(""), - monitorManager: server.NewMountMonitorManager(), +func OssfsMonitorInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + if op == nil || op.MetricsPath == "" { + return handler(ctx, op) } -} -func (i OssfsMonitorInterceptor) BeforeMount(req *mounter.MountOperation, err error) (*mounter.MountOperation, error) { - if req.MetricsPath == "" { - return req, nil - } // Get or create monitor for this target - monitor, found := i.monitorManager.GetMountMonitor(req.Target, req.MetricsPath, i.raw, true) + monitor, found := monitorManager.GetMountMonitor(op.Target, op.MetricsPath, raw, true) if monitor == nil { - return req, fmt.Errorf("failed to get mount monitor for %s, stop monitoring mountpoint status", req.Target) + klog.ErrorS(errors.New("failed to get mount monitor"), "stop monitoring mountpoint status", "mountpoint", op.Target) + return handler(ctx, op) } if found { monitor.IncreaseMountRetryCount() } - if err != nil { - monitor.HandleMountFailureOrExit(err) - } - return req, nil -} -func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err error) error { - if op.MetricsPath == "" { - return nil - } - monitor, _ := i.monitorManager.GetMountMonitor(op.Target, op.MetricsPath, i.raw, true) - if monitor == nil { - return fmt.Errorf("failed to get mount monitor for %s", op.Target) - } + err := handler(ctx, op) // Immediate process-exit handling during mount attempt // Assume the process exits with no error upon receiving SIGTERM, @@ -56,12 +40,13 @@ func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err erro monitor.HandleMountFailureOrExit(err) if op.MountResult == nil { - return nil + return err } res, ok := op.MountResult.(server.OssfsMountResult) if !ok { - return errors.New("failed to parse ossfs mount result") + klog.ErrorS(errors.New("failed to assert ossfs mount result type"), "skipping monitoring of mountpoint", "mountpoint", op.Target) + return err } go func() { @@ -70,11 +55,11 @@ func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err erro }() if err != nil { - return nil + return err } monitor.HandleMountSuccess(res.PID) // Start monitoring goroutine (ticker based only) - i.monitorManager.StartMonitoring(op.Target) + monitorManager.StartMonitoring(op.Target) return nil } diff --git a/pkg/mounter/interceptors/ossfs_secret.go b/pkg/mounter/interceptors/ossfs_secret.go index daebc7580..c6299e5e2 100644 --- a/pkg/mounter/interceptors/ossfs_secret.go +++ b/pkg/mounter/interceptors/ossfs_secret.go @@ -1,41 +1,60 @@ package interceptors import ( - "fmt" + "context" + "errors" "os" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + "k8s.io/klog/v2" ) -type OssfsSecretInterceptor struct { - passwdFile string -} +var _ mounter.MountInterceptor = OssfsSecretInterceptor -var _ mounter.MountInterceptor = OssfsSecretInterceptor{} +func OssfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + return ossfsSecretInterceptor(ctx, op, handler, "ossfs") +} -func NewOssfsSecretInterceptor() mounter.MountInterceptor { - return OssfsSecretInterceptor{} +func Ossfs2SecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { + return ossfsSecretInterceptor(ctx, op, handler, "ossfs2") } -func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) { - var err error - i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType) +func ossfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler, fuseType string) error { + passwdFile, err := utils.SaveOssSecretsToFile(op.Secrets, op.FsType) if err != nil { - return req, err + return err } - if i.passwdFile != "" { - req.Options = append(req.Options, "passwd_file="+i.passwdFile) + if passwdFile != "" { + if fuseType == "ossfs" { + op.Args = append(op.Args, "passwd_file="+passwdFile) + } else { + op.Args = append(op.Args, []string{"-c", passwdFile}...) + } } - return req, nil -} -func (i OssfsSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error { - if i.passwdFile == "" { + if err = handler(ctx, op); err != nil { + return err + } + + if passwdFile == "" || op.MountResult == nil { return nil } - if err := os.Remove(i.passwdFile); err != nil { - return fmt.Errorf("error removing passwd file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile) + result, ok := op.MountResult.(*server.OssfsMountResult) + if !ok { + klog.ErrorS( + errors.New("failed to assert ossfs mount result"), + "skipping cleanup of passwd file", "mountpoint", op.Target, "path", passwdFile, + ) + return nil } + + go func() { + <-result.ExitChan + if err := os.Remove(passwdFile); err != nil { + klog.ErrorS(err, "failed to cleanup ossfs passwd file", "mountpoint", op.Target, "path", passwdFile) + } + }() return nil } diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index 8dc385c48..fe56cc99a 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -2,9 +2,7 @@ package mounter import ( "context" - "fmt" - "k8s.io/klog/v2" mountutils "k8s.io/mount-utils" ) @@ -26,46 +24,46 @@ type MountOperation struct { MountResult any } -type MountInterceptor interface { - BeforeMount(op *MountOperation, err error) (*MountOperation, error) - AfterMount(op *MountOperation, err error) error -} +type MountHandler func(ctx context.Context, op *MountOperation) error + +type MountInterceptor func(ctx context.Context, op *MountOperation, handler MountHandler) error type MountWorkflow struct { Mounter - interceptors []MountInterceptor + chainedHandler MountHandler } var _ Mounter = &MountWorkflow{} func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) error { - var err error - for i, interceptor := range w.interceptors { - if op, err = interceptor.BeforeMount(op, err); err != nil && i != len(w.interceptors)-1 { - // Log error but continue to the next interceptor, since some interceptors may - // want to handle the error, e.g. the OssMonitorInterceptor. - // Only log for the intermediate interceptors, otherwise the final error will be printed twice. - klog.ErrorS(err, "error executing BeforeMount interceptor") - } + return w.chainedHandler(ctx, op) +} + +// chainInterceptors creates a chain of interceptors similar to gRPC +func chainInterceptors(interceptors []MountInterceptor, finalHandler MountHandler) MountHandler { + if len(interceptors) == 0 { + return finalHandler } - if err != nil { - return fmt.Errorf("error executing BeforeMount interceptor: %w", err) + + return func(ctx context.Context, op *MountOperation) error { + return interceptors[0](ctx, op, getChainHandler(interceptors, 0, finalHandler)) + } +} + +// getChainHandler creates a handler that chains interceptors recursively +func getChainHandler(interceptors []MountInterceptor, curr int, finalHandler MountHandler) MountHandler { + if curr == len(interceptors)-1 { + return finalHandler } - err = w.Mounter.ExtendedMount(ctx, op) - for _, interceptor := range w.interceptors { - if afterErr := interceptor.AfterMount(op, err); afterErr != nil { - // Log error but continue to the next interceptor. - // Do not override the original error from mount operation. - klog.ErrorS(afterErr, "error executing AfterMount interceptor") - } + return func(ctx context.Context, op *MountOperation) error { + return interceptors[curr+1](ctx, op, getChainHandler(interceptors, curr+1, finalHandler)) } - return err } func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter { return &MountWorkflow{ - Mounter: m, - interceptors: interceptors, + Mounter: m, + chainedHandler: chainInterceptors(interceptors, m.ExtendedMount), } } diff --git a/pkg/mounter/proxy/server/alinas/driver.go b/pkg/mounter/proxy/server/alinas/driver.go index 608cdc4a7..4fe2bef6b 100644 --- a/pkg/mounter/proxy/server/alinas/driver.go +++ b/pkg/mounter/proxy/server/alinas/driver.go @@ -30,7 +30,7 @@ func init() { server.RegisterDriver(&Driver{ Mounter: mounter.NewForMounter( &extendedMounter{Interface: mount.New("")}, - interceptors.NewAlinasSecretInterceptor(), + interceptors.AlinasSecretInterceptor, ), }) } diff --git a/pkg/mounter/proxy/server/ossfs/driver.go b/pkg/mounter/proxy/server/ossfs/driver.go index d9e91fb48..64724bafb 100644 --- a/pkg/mounter/proxy/server/ossfs/driver.go +++ b/pkg/mounter/proxy/server/ossfs/driver.go @@ -18,7 +18,6 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/interceptors" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" - serverutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -47,8 +46,8 @@ func NewDriver() *Driver { } driver.Mounter = mounter.NewForMounter( m, - interceptors.NewOssfsSecretInterceptor(), - interceptors.NewOssfsMonitorInterceptor(), + interceptors.OssfsSecretInterceptor, + interceptors.OssfsMonitorInterceptor, ) return driver } @@ -107,11 +106,12 @@ func (m *extendedMounter) ExtendedMount(ctx context.Context, op *mounter.MountOp target := op.Target args := mount.MakeMountArgs(op.Source, op.Target, "", options) + args = append(args, op.Args...) args = append(args, "-f") var stderrBuf bytes.Buffer multiWriter := io.MultiWriter(os.Stderr, &stderrBuf) - sw := serverutils.NewSwitchableWriter(multiWriter) + sw := server.NewSwitchableWriter(multiWriter) cmd := exec.Command("ossfs", args...) cmd.Stdout = os.Stdout cmd.Stderr = sw diff --git a/pkg/mounter/proxy/server/ossfs2/driver.go b/pkg/mounter/proxy/server/ossfs2/driver.go index 990aeafaa..c76bac5c6 100644 --- a/pkg/mounter/proxy/server/ossfs2/driver.go +++ b/pkg/mounter/proxy/server/ossfs2/driver.go @@ -45,8 +45,8 @@ func NewDriver() *Driver { } driver.Mounter = mounter.NewForMounter( m, - interceptors.NewOssfs2SecretInterceptor(), - interceptors.NewOssfsMonitorInterceptor(), + interceptors.Ossfs2SecretInterceptor, + interceptors.OssfsMonitorInterceptor, ) return driver } diff --git a/pkg/oss/nodeserver.go b/pkg/oss/nodeserver.go index 40d82e245..327e3133a 100644 --- a/pkg/oss/nodeserver.go +++ b/pkg/oss/nodeserver.go @@ -70,8 +70,8 @@ var unifiedFsType = OssFsType // Used by OssCmdMounter only, since ProxyMounter is only the client side, // the interceptors are applied at the server side. var ossInterceptors = map[string][]mounter.MountInterceptor{ - OssFsType: {interceptors.NewOssfsSecretInterceptor()}, - OssFs2Type: {interceptors.NewOssfs2SecretInterceptor()}, + OssFsType: {interceptors.OssfsSecretInterceptor}, + OssFs2Type: {interceptors.Ossfs2SecretInterceptor}, } func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { From 822c9f648852da61515364f271c529a506da359c Mon Sep 17 00:00:00 2001 From: iltyty Date: Thu, 27 Nov 2025 15:12:16 +0800 Subject: [PATCH 5/6] fix(nas): atomically update alinas credential file via temp file and rename --- pkg/mounter/interceptors/alinas_secret.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go index 3666a9bdd..e3a0333e6 100644 --- a/pkg/mounter/interceptors/alinas_secret.go +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -17,15 +17,26 @@ func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, ha return handler(ctx, op) } - tmpDir, err := os.MkdirTemp("", "alinas-") + tmpCredFile, err := os.CreateTemp("", op.VolumeID+"-*.credentials") if err != nil { return err } + defer func() { + if err = os.Remove(tmpCredFile.Name()); err != nil && !os.IsNotExist(err) { + klog.ErrorS(err, "Failed to remove temporary alinas credential file", "path", tmpCredFile.Name()) + } + }() credFileContent := makeCredFileContent(op.Secrets) - credFilePath := path.Join(tmpDir, op.VolumeID+".credentials") - if err = os.WriteFile(credFilePath, credFileContent, 0o600); err != nil { - os.RemoveAll(tmpDir) // cleanup in case of error + if _, err = tmpCredFile.Write(credFileContent); err != nil { + return err + } + if err = tmpCredFile.Close(); err != nil { + return err + } + + credFilePath := path.Join(os.TempDir(), op.VolumeID+".credentials") + if err = os.Rename(tmpCredFile.Name(), credFilePath); err != nil { return err } From 7aa22c433607f5d4f61861d6a1671864df7e4b7b Mon Sep 17 00:00:00 2001 From: iltyty Date: Fri, 28 Nov 2025 09:51:16 +0800 Subject: [PATCH 6/6] Add pkg/mounter tests --- pkg/mounter/interceptors/alinas_secret.go | 5 +- .../interceptors/alinas_secret_test.go | 120 ++++++++++++++++ .../interceptors/ossfs_monitor_test.go | 112 +++++++++++++++ pkg/mounter/interceptors/ossfs_secret.go | 6 +- pkg/mounter/interceptors/ossfs_secret_test.go | 128 ++++++++++++++++++ pkg/mounter/mounter.go | 2 +- pkg/mounter/mounter_test.go | 68 ++++++++++ 7 files changed, 437 insertions(+), 4 deletions(-) create mode 100644 pkg/mounter/interceptors/alinas_secret_test.go create mode 100644 pkg/mounter/interceptors/ossfs_monitor_test.go create mode 100644 pkg/mounter/interceptors/ossfs_secret_test.go create mode 100644 pkg/mounter/mounter_test.go diff --git a/pkg/mounter/interceptors/alinas_secret.go b/pkg/mounter/interceptors/alinas_secret.go index e3a0333e6..484f59c41 100644 --- a/pkg/mounter/interceptors/alinas_secret.go +++ b/pkg/mounter/interceptors/alinas_secret.go @@ -10,6 +10,7 @@ import ( "k8s.io/klog/v2" ) +var credDir = os.TempDir() var _ mounter.MountInterceptor = AlinasSecretInterceptor func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error { @@ -17,7 +18,7 @@ func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, ha return handler(ctx, op) } - tmpCredFile, err := os.CreateTemp("", op.VolumeID+"-*.credentials") + tmpCredFile, err := os.CreateTemp(credDir, op.VolumeID+"-*.credentials") if err != nil { return err } @@ -35,7 +36,7 @@ func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, ha return err } - credFilePath := path.Join(os.TempDir(), op.VolumeID+".credentials") + credFilePath := path.Join(credDir, op.VolumeID+".credentials") if err = os.Rename(tmpCredFile.Name(), credFilePath); err != nil { return err } diff --git a/pkg/mounter/interceptors/alinas_secret_test.go b/pkg/mounter/interceptors/alinas_secret_test.go new file mode 100644 index 000000000..b4c3e7702 --- /dev/null +++ b/pkg/mounter/interceptors/alinas_secret_test.go @@ -0,0 +1,120 @@ +package interceptors + +import ( + "context" + "fmt" + "os" + "path" + "sync" + "testing" + "time" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/stretchr/testify/assert" +) + +var ( + successMountHandler = func(context.Context, *mounter.MountOperation) error { + return nil + } + failureMountHandler = func(context.Context, *mounter.MountOperation) error { + return fmt.Errorf("failed") + } +) + +func TestAlinasSecretInterceptor(t *testing.T) { + credDir = t.TempDir() + print(credDir) + tests := []struct { + name string + concurrent bool + handler mounter.MountHandler + op *mounter.MountOperation + expectErr bool + }{ + { + name: "nil operation", + handler: successMountHandler, + }, + { + name: "nil secrets", + handler: successMountHandler, + op: &mounter.MountOperation{}, + }, + { + name: "mount error reservation", + handler: failureMountHandler, + op: &mounter.MountOperation{}, + expectErr: true, + }, + { + name: "normal operation", + handler: successMountHandler, + op: &mounter.MountOperation{ + Secrets: map[string]string{ + "akId": "akid", + "akSecret": "aksecret", + }, + VolumeID: "volume-id", + }, + }, + { + name: "concurrent operations", + concurrent: true, + handler: successMountHandler, + op: &mounter.MountOperation{ + Secrets: map[string]string{ + "akId": "akid", + "akSecret": "aksecret", + }, + VolumeID: "volume-id", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var errs []error + var mutex sync.Mutex + + if !tt.concurrent { + if err := AlinasSecretInterceptor(context.Background(), tt.op, tt.handler); err != nil { + errs = append(errs, err) + } + } else { + var wg sync.WaitGroup + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(time.Millisecond) + err := AlinasSecretInterceptor(context.Background(), tt.op, tt.handler) + mutex.Lock() + if err != nil { + errs = append(errs, err) + } + mutex.Unlock() + }() + } + wg.Wait() + } + + if tt.expectErr { + assert.NotEmpty(t, errs) + return + } + + assert.Nil(t, errs) + if tt.op == nil || tt.op.Secrets == nil { + return + } + + content, err := os.ReadFile(path.Join(credDir, tt.op.VolumeID+".credentials")) + assert.NoError(t, err) + assert.Equal( + t, + fmt.Sprintf("[NASCredentials]\naccessKeyID=%s\naccessKeySecret=%s", tt.op.Secrets["akId"], tt.op.Secrets["akSecret"]), + string(content)) + }) + } +} diff --git a/pkg/mounter/interceptors/ossfs_monitor_test.go b/pkg/mounter/interceptors/ossfs_monitor_test.go new file mode 100644 index 000000000..ad27b744b --- /dev/null +++ b/pkg/mounter/interceptors/ossfs_monitor_test.go @@ -0,0 +1,112 @@ +package interceptors + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" + "github.com/stretchr/testify/assert" +) + +func TestOssfsMonitorInterceptor(t *testing.T) { + metricsDir := t.TempDir() + tests := []struct { + name string + handler mounter.MountHandler + op *mounter.MountOperation + expectErr bool + }{ + { + name: "nil operation", + handler: successMountHandler, + }, + { + name: "nil metrics path", + handler: successMountHandler, + op: &mounter.MountOperation{}, + }, + { + name: "mount error reservation", + handler: failureMountHandler, + op: &mounter.MountOperation{ + MetricsPath: metricsDir, + }, + expectErr: true, + }, + { + name: "nil mount result", + handler: successMountHandler, + op: &mounter.MountOperation{ + MetricsPath: metricsDir, + Target: "target1", + }, + }, + { + name: "invalid mount result", + handler: successMountHandler, + op: &mounter.MountOperation{ + MetricsPath: metricsDir, + MountResult: "invalid", + Target: "target2", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := OssfsMonitorInterceptor(context.Background(), tt.op, tt.handler) + if tt.expectErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + if tt.op == nil || tt.op.MetricsPath == "" { + return + } + + monitor, found := monitorManager.GetMountMonitor(tt.op.Target, tt.op.MetricsPath, raw, false) + assert.True(t, found) + assert.NotNil(t, monitor) + + }) + } + + monitorManager.StopAllMonitoring() + monitorManager.WaitForAllMonitoring() + monitorManager = server.NewMountMonitorManager() + defer func() { + monitorManager.StopAllMonitoring() + monitorManager.WaitForAllMonitoring() + }() + + op := &mounter.MountOperation{ + Target: "volume1", + MetricsPath: metricsDir, + MountResult: server.OssfsMountResult{ + PID: 123, + ExitChan: make(chan error), + }, + } + err := OssfsMonitorInterceptor(context.Background(), op, successMountHandler) + assert.NoError(t, err) + monitor, found := monitorManager.GetMountMonitor(op.Target, op.MetricsPath, raw, false) + assert.True(t, found) + assert.NotNil(t, monitor) + assertMountMetricValue(t, op.MetricsPath, utils.MetricsMountPointStatus, "0") + assertMountMetricValue(t, op.MetricsPath, utils.MetricsMountRetryCount, "0") + + err = OssfsMonitorInterceptor(context.Background(), op, failureMountHandler) + assert.Error(t, err) + assertMountMetricValue(t, op.MetricsPath, utils.MetricsMountPointStatus, "1") + assertMountMetricValue(t, op.MetricsPath, utils.MetricsMountRetryCount, "1") +} + +func assertMountMetricValue(t *testing.T, metricsDir, metricsFile string, expected string) { + actual, err := os.ReadFile(filepath.Join(metricsDir, metricsFile)) + assert.NoError(t, err) + assert.Equal(t, expected, string(actual)) +} diff --git a/pkg/mounter/interceptors/ossfs_secret.go b/pkg/mounter/interceptors/ossfs_secret.go index c6299e5e2..d98e3a48b 100644 --- a/pkg/mounter/interceptors/ossfs_secret.go +++ b/pkg/mounter/interceptors/ossfs_secret.go @@ -22,6 +22,10 @@ func Ossfs2SecretInterceptor(ctx context.Context, op *mounter.MountOperation, ha } func ossfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler, fuseType string) error { + if op == nil || op.Secrets == nil { + return handler(ctx, op) + } + passwdFile, err := utils.SaveOssSecretsToFile(op.Secrets, op.FsType) if err != nil { return err @@ -41,7 +45,7 @@ func ossfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, han if passwdFile == "" || op.MountResult == nil { return nil } - result, ok := op.MountResult.(*server.OssfsMountResult) + result, ok := op.MountResult.(server.OssfsMountResult) if !ok { klog.ErrorS( errors.New("failed to assert ossfs mount result"), diff --git a/pkg/mounter/interceptors/ossfs_secret_test.go b/pkg/mounter/interceptors/ossfs_secret_test.go new file mode 100644 index 000000000..ab2647b30 --- /dev/null +++ b/pkg/mounter/interceptors/ossfs_secret_test.go @@ -0,0 +1,128 @@ +package interceptors + +import ( + "context" + "testing" + "time" + + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" + "github.com/stretchr/testify/assert" +) + +var mockOssfsHandler = func(ctx context.Context, op *mounter.MountOperation) error { + result := server.OssfsMountResult{ + PID: 123, + ExitChan: make(chan error), + } + go func() { + time.Sleep(500 * time.Millisecond) + close(result.ExitChan) + }() + op.MountResult = result + return nil +} + +func TestOssfsSecretInterceptor(t *testing.T) { + tests := []struct { + name string + handler mounter.MountHandler + op *mounter.MountOperation + expectErr bool + expectFile bool + }{ + { + name: "nil operation", + handler: successMountHandler, + }, + { + name: "nil secrets", + handler: successMountHandler, + }, + { + name: "mount error reservation", + handler: failureMountHandler, + expectErr: true, + }, + { + name: "ni mount result", + handler: successMountHandler, + op: &mounter.MountOperation{ + FsType: "ossfs", + Secrets: map[string]string{ + "passwd-ossfs": "akid:aksecret:bucket", + }, + }, + expectFile: true, + }, + { + name: "invalid mount result", + handler: successMountHandler, + op: &mounter.MountOperation{ + FsType: "ossfs", + MountResult: "invalid", + Secrets: map[string]string{ + "passwd-ossfs": "akid:aksecret:bucket", + }, + }, + expectFile: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := OssfsSecretInterceptor(context.Background(), tt.op, tt.handler) + if tt.expectErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + if tt.op == nil { + return + } + + assert.Len(t, tt.op.Args, 1) + assert.Contains(t, tt.op.Args[0], "passwd_file=") + + filePath := tt.op.Args[0][len("passwd_file="):] + if tt.expectFile { + assert.FileExists(t, filePath) + } else { + assert.NoFileExists(t, filePath) + } + }) + } + + op := &mounter.MountOperation{ + FsType: "ossfs", + Secrets: map[string]string{ + "passwd-ossfs": "akid:aksecret:bucket", + }, + } + err := OssfsSecretInterceptor(context.Background(), op, mockOssfsHandler) + assert.NoError(t, err) + + result := op.MountResult.(server.OssfsMountResult) + <-result.ExitChan + + assert.Len(t, op.Args, 1) + assert.Contains(t, op.Args[0], "passwd_file=") + time.Sleep(500 * time.Millisecond) // Wait for ossfs monitor interceptor to cleanup the credential file + assert.NoFileExists(t, op.Args[0][len("passwd_file="):]) +} + +func TestOssfs2SecretInterceptor(t *testing.T) { + op := &mounter.MountOperation{ + FsType: "ossfs2", + Secrets: map[string]string{ + "passwd-ossfs2": "akid:aksecret:bucket", + }, + } + err := Ossfs2SecretInterceptor(context.Background(), op, successMountHandler) + assert.NoError(t, err) + assert.Len(t, op.Args, 2) + assert.Equal(t, op.Args[0], "-c") + assert.NotEmpty(t, op.Args[1]) + assert.FileExists(t, op.Args[1]) +} diff --git a/pkg/mounter/mounter.go b/pkg/mounter/mounter.go index fe56cc99a..c12c6375c 100644 --- a/pkg/mounter/mounter.go +++ b/pkg/mounter/mounter.go @@ -52,7 +52,7 @@ func chainInterceptors(interceptors []MountInterceptor, finalHandler MountHandle // getChainHandler creates a handler that chains interceptors recursively func getChainHandler(interceptors []MountInterceptor, curr int, finalHandler MountHandler) MountHandler { - if curr == len(interceptors)-1 { + if curr >= len(interceptors)-1 { return finalHandler } diff --git a/pkg/mounter/mounter_test.go b/pkg/mounter/mounter_test.go new file mode 100644 index 000000000..2931eb2b7 --- /dev/null +++ b/pkg/mounter/mounter_test.go @@ -0,0 +1,68 @@ +package mounter + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +var successHandler = func(context.Context, *MountOperation) error { + return nil +} +var failureHandler = func(context.Context, *MountOperation) error { + return errors.New("mount failed") +} + +func Test_chainInterceptors(t *testing.T) { + count := 0 + interceptor := func(ctx context.Context, op *MountOperation, handler MountHandler) error { + count++ + return handler(ctx, op) + } + interceptors := []MountInterceptor{interceptor, interceptor} + + handler := chainInterceptors(interceptors, successHandler) + err := handler(context.Background(), &MountOperation{}) + assert.NoError(t, err) + assert.Equal(t, 2, count) + + count = 0 + handler = chainInterceptors(nil, successHandler) + err = handler(context.Background(), &MountOperation{}) + assert.NoError(t, err) + assert.Equal(t, 0, count) + + count = 0 + handler = chainInterceptors(interceptors, failureHandler) + err = handler(context.Background(), &MountOperation{}) + assert.Error(t, err) + assert.Equal(t, 2, count) +} + +func TestGetChainHandler(t *testing.T) { + count := 0 + interceptor := func(ctx context.Context, op *MountOperation, handler MountHandler) error { + count++ + return handler(ctx, op) + } + interceptors := []MountInterceptor{interceptor, interceptor} + + handler := getChainHandler(interceptors, 0, successHandler) + err := handler(context.Background(), &MountOperation{}) + assert.NoError(t, err) + assert.Equal(t, 1, count) + + count = 0 + handler = getChainHandler(interceptors, 1, successHandler) + err = handler(context.Background(), &MountOperation{}) + assert.NoError(t, err) + assert.Equal(t, 0, count) + + count = 0 + handler = getChainHandler(interceptors, 0, failureHandler) + err = handler(context.Background(), &MountOperation{}) + assert.Error(t, err) + assert.Equal(t, 1, count) +}