diff --git a/cmd/mount-proxy-client/main.go b/cmd/mount-proxy-client/main.go index cf04773fd..10702df96 100644 --- a/cmd/mount-proxy-client/main.go +++ b/cmd/mount-proxy-client/main.go @@ -31,7 +31,11 @@ func main() { data, _ := json.MarshalIndent(req, "", "\t") fmt.Println(string(data)) - dclient := client.NewClient(socketPath) + dclient, err := client.NewClient(socketPath) + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } resp, err := dclient.Mount(&req) if err != nil { fmt.Println(err.Error()) diff --git a/pkg/features/features.go b/pkg/features/features.go index ed76dbc0e..3233aa383 100644 --- a/pkg/features/features.go +++ b/pkg/features/features.go @@ -46,6 +46,15 @@ const ( // This configuration only takes effect for newly mounted OSS volumes. UpdatedOssfsVersion featuregate.Feature = "UpdatedOssfsVersion" + // EnableOssfsRecovery enables OSSFS recovery. + // + // OSSFS recovery is a feature that allows OSSFS to recover from a failure state. + // When EnableOssfsRecovery is enabled, if ossfs exits unexpectedly due to non-SIGTERM reasons during operation, + // the daemon process in the container will be responsible for restarting ossfs without restarting the business + // container that mounted the OSS volume, thus achieving mount point self-healing. + // Note: Mount point self-healing depends on a specially optimized version of FUSE. + EnableOssfsRecovery featuregate.Feature = "EnableOssfsRecovery" + RundCSIProtocol3 featuregate.Feature = "RundCSIProtocol3" // Enable volume group snapshots. @@ -70,6 +79,7 @@ var ( defaultOSSFeatureGate = map[featuregate.Feature]featuregate.FeatureSpec{ UpdatedOssfsVersion: {Default: true, PreRelease: featuregate.Beta}, + EnableOssfsRecovery: {Default: false, PreRelease: featuregate.Alpha}, } defaultNasFeatureGate = map[featuregate.Feature]featuregate.FeatureSpec{ diff --git a/pkg/mounter/oss/common.go b/pkg/mounter/oss/common.go index 28bc7b48a..1d62d22b1 100644 --- a/pkg/mounter/oss/common.go +++ b/pkg/mounter/oss/common.go @@ -30,6 +30,17 @@ const ( ) func setDefaultImage(fuseType string, m metadata.MetadataProvider, config *mounterutils.FuseContainerConfig) { + // set recovery image + // TODO: remove this after recovery capacity reaches beta status. + if features.FunctionalMutableFeatureGate.Enabled(features.EnableOssfsRecovery) { + switch fuseType { + case OssFsType: + config.RecoveryImage = fmt.Sprintf("csi-%s:%s", fuseType, defaultOssfsRecoveryImageTag) + case OssFs2Type: + config.RecoveryImage = fmt.Sprintf("csi-%s:%s", fuseType, defaultOssfs2RecoveryImageTag) + } + } + // deprecated if config.Image != "" { return diff --git a/pkg/mounter/oss/oss_fuse_manager.go b/pkg/mounter/oss/oss_fuse_manager.go index a4fdd1204..c9a717389 100644 --- a/pkg/mounter/oss/oss_fuse_manager.go +++ b/pkg/mounter/oss/oss_fuse_manager.go @@ -86,4 +86,5 @@ type Options struct { AuthType string `json:"authType"` FuseType string `json:"fuseType"` ReadOnly bool `json:"readOnly"` + Recovery bool `json:"recovery"` // Recovery will be enabled if attributes OR featuregate is enabled } diff --git a/pkg/mounter/oss/ossfs.go b/pkg/mounter/oss/ossfs.go index b27be577b..65e0c696d 100644 --- a/pkg/mounter/oss/ossfs.go +++ b/pkg/mounter/oss/ossfs.go @@ -21,6 +21,10 @@ import ( var defaultOssfsImageTag = "v1.88.4-80d165c-aliyun" var defaultOssfsUpdatedImageTag = "v1.91.7.ack.1-570be5f-aliyun" + +// TODO: The recovery-enabled image will modify the underlying FUSE-related dynamic libraries. +// After the Recovery capability reaches beta status, these will be unified to the optimized libraries. +var defaultOssfsRecoveryImageTag = "v1.91.7.ack.1-recovery-570be5f-aliyun" var defaultOssfsDbglevel = utils.DebugLevelError const ( diff --git a/pkg/mounter/oss/ossfs2.go b/pkg/mounter/oss/ossfs2.go index 1e0d795fa..cb7740342 100644 --- a/pkg/mounter/oss/ossfs2.go +++ b/pkg/mounter/oss/ossfs2.go @@ -18,6 +18,9 @@ import ( ) var defaultOssfs2ImageTag = "v2.0.1.ack.1-ecb0808-aliyun" +// TODO: The recovery-enabled image will modify the underlying FUSE-related dynamic libraries. +// After the Recovery capability reaches beta status, these will be unified to the optimized libraries. +var defaultOssfs2RecoveryImageTag = "v2.0.1.ack.1-recovery-ecb0808-aliyun" var defaultOssfs2Dbglevel = utils.DebugLevelInfo type fuseOssfs2 struct { diff --git a/pkg/mounter/proxy/client/client.go b/pkg/mounter/proxy/client/client.go index e98d55190..2f7d376bc 100644 --- a/pkg/mounter/proxy/client/client.go +++ b/pkg/mounter/proxy/client/client.go @@ -11,11 +11,13 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "golang.org/x/sys/unix" "k8s.io/klog/v2" + "k8s.io/mount-utils" ) const ( // this should be longer than default timeout in server defaultTimeout = time.Second * 35 + FuseMountType = "fuse" ) type Client interface { @@ -23,15 +25,21 @@ type Client interface { } type client struct { + mount.MounterForceUnmounter timeout time.Duration socketPath string } -func NewClient(socketPath string) *client { - return &client{ - socketPath: socketPath, - timeout: defaultTimeout, +func NewClient(socketPath string) (*client, error) { + m, ok := mount.New("").(mount.MounterForceUnmounter) + if !ok { + return nil, errors.New("failed to cast mounter to MounterForceUnmounter") } + return &client{ + MounterForceUnmounter: m, + socketPath: socketPath, + timeout: defaultTimeout, + }, nil } func (c *client) doRequest(req *proxy.Request) (*proxy.Response, error) { @@ -57,7 +65,22 @@ func (c *client) doRequest(req *proxy.Request) (*proxy.Response, error) { socket := int(connf.Fd()) defer connf.Close() - err = unix.Sendmsg(socket, append(data, proxy.MessageEnd), nil, nil, 0) + // 1. open /dev/fuse as root + fuseFd, err := unix.Open("/dev/fuse", unix.O_RDWR, 0o644) + if err != nil { + return nil, fmt.Errorf("open /dev/fuse: %w", err) + } + defer unix.Close(fuseFd) + + // 2. mount FUSE filesystem with Fd + err = c.mountFuseFilesystemWithFd(req, fuseFd) + if err != nil { + return nil, err + } + + // 3. send fd with unix conn + oob := unix.UnixRights(fuseFd) + err = unix.Sendmsg(socket, append(data, proxy.MessageEnd), oob, nil, 0) if err != nil { return nil, fmt.Errorf("sendmsg: %w", err) } @@ -93,3 +116,27 @@ func (c *client) Mount(req *proxy.MountRequest) (*proxy.Response, error) { Body: req, }) } + +func (c *client) mountFuseFilesystemWithFd(req *proxy.Request, fd int) error { + mountReq, ok := req.Body.(*proxy.MountRequest) + if !ok { + return errors.New("invalid request body") + } + // 2.1 split FUSE options + // ex: rw,nosuid,nodev,relatime,user_id=0,group_id=0,allow_other + // fuseOptions, daemonOptions := splitFuseOptions(mountReq.Options) + // 2.2 add fd=`fuseFd` option + // fuseOptions = append(fuseOptions, fmt.Sprintf("fd=%v", fd)) + fuseOptions, daemonOptions, err := splitFuseOptions(mountReq.Options) + if err != nil { + return err + } + fuseOptions = append(fuseOptions, fmt.Sprintf("fd=%v", fd)) + err = c.MountSensitiveWithoutSystemdWithMountFlags(mountReq.Source, mountReq.Target, FuseMountType, fuseOptions, nil, []string{"--internal-only"}) + if err != nil { + return fmt.Errorf("failed to mount the fuse filesystem: %w", err) + } + mountReq.Options = daemonOptions + req.Body = mountReq + return nil +} diff --git a/pkg/mounter/proxy/client/fuse_options.go b/pkg/mounter/proxy/client/fuse_options.go new file mode 100644 index 000000000..0d753d2e6 --- /dev/null +++ b/pkg/mounter/proxy/client/fuse_options.go @@ -0,0 +1,109 @@ +package client + +import ( + "fmt" + "os" + "strconv" + "strings" + + "k8s.io/apimachinery/pkg/util/sets" +) + +const NULLVAL string = "&NULLVAL" + +var defaultFuseOptionsMap = map[string]string{ + "nodev": NULLVAL, + "nosuid": NULLVAL, + "rootmode": "40000", + // Note: user_id and group_id are the owner of the FUSE filesystem. Together with + // allow_other, they enable other users to access the FUSE filesystem. The uid and + // gid parameters of ossfs are application-level parameters used to simulate file permissions. + "allow_other": NULLVAL, + "default_permissions": NULLVAL, + "user_id": strconv.Itoa(os.Getuid()), + "group_id": strconv.Itoa(os.Getgid()), +} + +type FuseOptionType int + +const ( + SupportedOption FuseOptionType = iota + UnsupportedOption + IgnoredOption +) + +// Refer: https://man7.org/linux/man-pages/man8/mount.fuse3.8.html +// Note: Disable dev and suid options for security reasons. Disable rw option +// because it's the default mode, and we only need to set ro to disable write +// access, avoiding conflicts with ro settings from other mount points. +var fuseOptionKeys = map[string]FuseOptionType{ + "exec": SupportedOption, + "noexec": SupportedOption, + "atime": SupportedOption, + "noatime": SupportedOption, + "sync": SupportedOption, + "async": SupportedOption, + "dirsync": SupportedOption, + "debug": SupportedOption, + "ro": SupportedOption, + "dev": UnsupportedOption, + "suid": UnsupportedOption, + "auto_unmount": UnsupportedOption, + "rw": IgnoredOption, +} + +func splitFuseOptions(options []string) (fuseOptions, daemonOptions []string, err error) { + fuseOptionsMap := make(map[string]string) + optionSet := sets.New(options...) + + // inital with default fuse options + for key, val := range defaultFuseOptionsMap { + fuseOptionsMap[key] = val + } + + for _, o := range options { + kv := strings.SplitN(o, "=", 2) + // empty kv, invalid + if len(kv) == 0 { + optionSet.Delete(o) + continue + } + key := kv[0] + if key == "" { + optionSet.Delete(o) + continue + } + v, ok := defaultFuseOptionsMap[key] + // if the key is default, compare the value + // if not equal, update the defualt value + if ok { + if v != NULLVAL && len(kv) == 2 && v != kv[1] { + fuseOptionsMap[key] = kv[1] + } + continue + } + // if the key is not for fuse, see as daemon options + t, ok := fuseOptionKeys[key] + if !ok { + continue + } + switch t { + case SupportedOption: + fuseOptionsMap[key] = NULLVAL + case IgnoredOption: + optionSet.Delete(o) + case UnsupportedOption: + return nil, nil, fmt.Errorf("Unsupported option: %s", o) + } + } + daemonOptions = optionSet.UnsortedList() + for k, v := range fuseOptionsMap { + if v == NULLVAL { + fuseOptions = append(fuseOptions, k) + } else { + fuseOptions = append(fuseOptions, fmt.Sprintf("%s=%s", k, v)) + } + } + + return +} diff --git a/pkg/mounter/proxy/client/fuse_options_test.go b/pkg/mounter/proxy/client/fuse_options_test.go new file mode 100644 index 000000000..d636391ea --- /dev/null +++ b/pkg/mounter/proxy/client/fuse_options_test.go @@ -0,0 +1,116 @@ +package client + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +var defaultFuseOptions = []string{ + "nodev", + "nosuid", + "rootmode=40000", + "allow_other", + "default_permissions", + fmt.Sprintf("user_id=%d", os.Getuid()), + fmt.Sprintf("group_id=%d", os.Getegid()), +} + +func TestSplitFuseOptions(t *testing.T) { + var modifyFuseOptions, debugFuseoptions []string + + modifyFuseOptions = append(modifyFuseOptions, defaultFuseOptions...) + modifyFuseOptions[2] = "rootmode=0600" + + debugFuseoptions = append(debugFuseoptions, defaultFuseOptions...) + debugFuseoptions = append(debugFuseoptions, "debug") + + tests := []struct { + name string + options []string + defaultFuseOpts map[string]string + fuseOptionKeys map[string]int + nullVal string + expectedFuse []string + expectedDaemon []string + expectError bool + }{ + { + name: "Empty options", + options: []string{}, + expectedFuse: defaultFuseOptions, + expectedDaemon: []string{}, + expectError: false, + }, + { + name: "Only default options with same values", + options: []string{"allow_other", "rootmode=40000"}, + expectedFuse: defaultFuseOptions, + expectedDaemon: []string{"allow_other", "rootmode=40000"}, + expectError: false, + }, + { + name: "Default options with different values", + options: []string{"rootmode=0600", "default_permissions"}, + expectedFuse: modifyFuseOptions, + expectedDaemon: []string{"rootmode=0600", "default_permissions"}, + expectError: false, + }, + { + name: "Supported non-default options", + options: []string{"debug"}, + expectedFuse: debugFuseoptions, + expectedDaemon: []string{"debug"}, + expectError: false, + }, + { + name: "Ignored options", + options: []string{"rw", "allow_other"}, + expectedFuse: defaultFuseOptions, + expectedDaemon: []string{"allow_other"}, + expectError: false, + }, + { + name: "Unsupported options should return error", + options: []string{"dev"}, + expectedFuse: nil, + expectedDaemon: nil, + expectError: true, + }, + { + name: "Daemon options", + options: []string{"another_daemon_opt"}, + expectedFuse: defaultFuseOptions, + expectedDaemon: []string{"another_daemon_opt"}, + expectError: false, + }, + { + name: "Mixed options", + options: []string{"allow_other", "daemon_opt=value"}, + expectedFuse: defaultFuseOptions, + expectedDaemon: []string{"allow_other", "daemon_opt=value"}, + expectError: false, + }, + { + name: "Invalid empty kv option", + options: []string{""}, + expectedFuse: defaultFuseOptions, + expectedDaemon: []string{}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fuseOptions, daemonOptions, err := splitFuseOptions(tt.options) + assert.Equal(t, tt.expectError, err != nil) + if tt.expectError { + return + } + assert.ElementsMatch(t, tt.expectedFuse, fuseOptions) + assert.ElementsMatch(t, tt.expectedDaemon, daemonOptions) + }) + } +} diff --git a/pkg/mounter/proxy/protocol.go b/pkg/mounter/proxy/protocol.go index 911e4ad53..c43e02bcb 100644 --- a/pkg/mounter/proxy/protocol.go +++ b/pkg/mounter/proxy/protocol.go @@ -39,6 +39,7 @@ type MountRequest struct { Source string `json:"source,omitempty"` Target string `json:"target,omitempty"` Fstype string `json:"fstype,omitempty"` + Recovery bool `json:"recovery,omitempty"` Options []string `json:"options,omitempty"` MountFlags []string `json:"mountFlags,omitempty"` Secrets map[string]string `json:"secrets,omitempty"` diff --git a/pkg/mounter/proxy/server/alinas/driver.go b/pkg/mounter/proxy/server/alinas/driver.go index 255b43e2a..feedbe8ac 100644 --- a/pkg/mounter/proxy/server/alinas/driver.go +++ b/pkg/mounter/proxy/server/alinas/driver.go @@ -40,7 +40,7 @@ func (h *Driver) Fstypes() []string { return []string{fstypeAlinas, fstypeCpfsNfs} } -func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { +func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest, fuseFd int) error { klog.InfoS("Mounting", "fstype", req.Fstype, "source", req.Source, "target", req.Target, "options", req.Options) options := append(req.Options, "no_start_watchdog") if req.Fstype == fstypeAlinas { diff --git a/pkg/mounter/proxy/server/driver.go b/pkg/mounter/proxy/server/driver.go index 3c04a80da..ba91622ed 100644 --- a/pkg/mounter/proxy/server/driver.go +++ b/pkg/mounter/proxy/server/driver.go @@ -3,8 +3,14 @@ package server import ( "context" "fmt" + "os" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" + "k8s.io/klog/v2" +) + +const ( + FuseConnectionsDir = "/sys/fs/fuse/connections" ) type Driver interface { @@ -12,7 +18,7 @@ type Driver interface { Fstypes() []string Init() Terminate() - Mount(ctx context.Context, req *proxy.MountRequest) error + Mount(ctx context.Context, req *proxy.MountRequest, fuseFd int) error } var ( @@ -24,10 +30,30 @@ func RegisterDriver(driver Driver) { nameToDriver[driver.Name()] = driver } -func handleMountRequest(ctx context.Context, req *proxy.MountRequest) error { +func handleMountRequest(ctx context.Context, req *proxy.MountRequest, fuseFd int) error { h := fstypeToDriver[req.Fstype] if h == nil { return fmt.Errorf("fstype %q not supported", req.Fstype) } - return h.Mount(ctx, req) + return h.Mount(ctx, req, fuseFd) +} + +func CleanPasswdFile(passwdFile, target string) { + if err := os.Remove(passwdFile); err != nil { + klog.ErrorS(err, "Remove passwd file", "mountpoint", target, "path", passwdFile) + } +} + +func SafeSend(ch chan error, err error) (closed bool) { + if ch == nil { + return + } + defer func() { + if recover() != nil { + closed = true + } + }() + + ch <- err + return false } diff --git a/pkg/mounter/proxy/server/handler.go b/pkg/mounter/proxy/server/handler.go index 16041ce6e..6be31d633 100644 --- a/pkg/mounter/proxy/server/handler.go +++ b/pkg/mounter/proxy/server/handler.go @@ -41,15 +41,39 @@ func Handle(conn net.Conn, timeout time.Duration) error { klog.V(4).InfoS("Start to recvmsg") p := make([]byte, proxy.MaxMsgSize) - n, _, _, _, err := unix.Recvmsg(socket, p, nil, 0) + oob := make([]byte, unix.CmsgSpace(4)) // 4 bytes for the file descriptor + n, oobn, _, _, err := unix.Recvmsg(socket, p, oob, 0) if err != nil { return fmt.Errorf("recvmsg: %w", err) } - klog.V(4).InfoS("Succeeded to recvmsg", "n", n) + klog.V(4).InfoS("Succeeded to recvmsg", "n", n, "oobn", oobn) ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() + // parse fd + msgs, err := unix.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + return fmt.Errorf("parse socket control message: %w", err) + } + + if len(msgs) == 0 { + return errors.New("no file descriptor received") + } + + fds, err := unix.ParseUnixRights(&msgs[0]) + if err != nil { + return fmt.Errorf("parse unix rights: %w", err) + } + + if len(fds) == 0 { + return errors.New("no file descriptor received") + } + + fuseFd := fds[0] + klog.V(4).InfoS("Succeeded to parse fd", "fuseFd", fuseFd) + + // parse mount info var resp proxy.Response req, err := parseRawRequest(p[:n]) if err != nil { @@ -57,7 +81,7 @@ func Handle(conn net.Conn, timeout time.Duration) error { Error: err.Error(), } } else { - resp = handle(ctx, req) + resp = handle(ctx, req, fuseFd) } data, err := json.Marshal(resp) @@ -88,7 +112,7 @@ func parseRawRequest(data []byte) (*rawRequest, error) { return &req, json.Unmarshal(data[:end], &req) } -func handle(ctx context.Context, req *rawRequest) proxy.Response { +func handle(ctx context.Context, req *rawRequest, fuseFd int) proxy.Response { switch req.Header.Method { case proxy.Mount: var mountReq proxy.MountRequest @@ -98,7 +122,7 @@ func handle(ctx context.Context, req *rawRequest) proxy.Response { Error: err.Error(), } } - err = handleMountRequest(ctx, &mountReq) + err = handleMountRequest(ctx, &mountReq, fuseFd) if err != nil { return proxy.Response{ Error: err.Error(), diff --git a/pkg/mounter/proxy/server/ossfs/driver.go b/pkg/mounter/proxy/server/ossfs/driver.go index 4f345c30c..d6bfd6699 100644 --- a/pkg/mounter/proxy/server/ossfs/driver.go +++ b/pkg/mounter/proxy/server/ossfs/driver.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "sync" "syscall" "time" @@ -14,6 +15,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + "github.com/moby/sys/mountinfo" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -44,8 +46,34 @@ func (h *Driver) Fstypes() []string { return []string{"ossfs"} } -func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { +func (h *Driver) runCmd(req *proxy.MountRequest, fuseFd int) (cmd *exec.Cmd, err error) { + // 1. use /dev/fd/3 as target for ossfs + args := mount.MakeMountArgs(req.Source, "/dev/fd/3", "", req.Options) + args = append(args, req.MountFlags...) + args = append(args, "-f") + + cmd = exec.Command("ossfs", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + // 2. pase fd on /dev/fuse by fuseFd + // Notes: Mountproxy client (csi-plugin) opens a fd on /dev/fuse, + // client passes it by unix conn. + // Mountproxy server (ossfs pod) receives it by unix conn, + // server mounts with ossfs with /dev/fd/3 (on /dev/fuse) as target. + cmd.ExtraFiles = []*os.File{os.NewFile(uintptr(fuseFd), "/dev/fuse")} + + err = cmd.Start() + if err != nil { + err = fmt.Errorf("start ossfs failed: %w", err) + return + } + klog.InfoS("Started ossfs", "pid", cmd.Process.Pid, "args", cmd.Args) + return +} + +func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest, fuseFd int) error { options := req.Options + target := req.Target // prepare passwd file var passwdFile string @@ -63,40 +91,65 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { options = append(options, "passwd_file="+passwdFile) } - args := mount.MakeMountArgs(req.Source, req.Target, "", options) - args = append(args, req.MountFlags...) - args = append(args, "-f") - - cmd := exec.Command("ossfs", args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Start() + cmd, err := h.runCmd(req, fuseFd) if err != nil { return fmt.Errorf("start ossfs failed: %w", err) } - target := req.Target - pid := cmd.Process.Pid - klog.InfoS("Started ossfs", "pid", pid, "args", args) + info, err := mountinfo.GetMounts(func(i *mountinfo.Info) (skip bool, stop bool) { + return i.Mountpoint != target, i.Mountpoint == target + }) + if len(info) == 0 { + return fmt.Errorf("mountpoint %s is not mounted", target) + } + chanId := info[0].Minor ossfsExited := make(chan error, 1) h.wg.Add(1) - h.pids.Store(pid, cmd) go func() { defer h.wg.Done() - defer h.pids.Delete(pid) + // release to avoid unexpected forking + defer syscall.Close(fuseFd) + defer server.CleanPasswdFile(passwdFile, target) - err := cmd.Wait() - if err != nil { - klog.ErrorS(err, "ossfs exited with error", "mountpoint", target, "pid", pid) - } else { - klog.InfoS("ossfs exited", "mountpoint", target, "pid", pid) - } - ossfsExited <- err - if err := os.Remove(passwdFile); err != nil { - klog.ErrorS(err, "Remove passwd file", "mountpoint", target, "path", passwdFile) + for { + pid := cmd.Process.Pid + h.pids.Store(pid, cmd) + + err = cmd.Wait() + if err != nil { + klog.ErrorS(err, "ossfs exited with error", "mountpoint", target, "pid", pid, "errorCode", cmd.ProcessState.ExitCode()) + } else { + klog.InfoS("ossfs exited", "mountpoint", target, "pid", pid) + } + h.pids.Delete(pid) + + hasClosed := server.SafeSend(ossfsExited, err) + if !hasClosed { + return + } + + waitStatus, ok := cmd.ProcessState.Sys().(syscall.WaitStatus) + if ok && waitStatus.Signal() == syscall.SIGTERM { + return + } + + err := fmt.Errorf("Ossfs exited, need recovery") + klog.ErrorS(err, "mountpoint", target) + klog.InfoS("Flush fuse connection", "chanId", chanId, "mountpoint", target) + err = os.WriteFile(filepath.Join(server.FuseConnectionsDir, strconv.Itoa(chanId), "flush"), []byte("1"), 0o600) + if err != nil { + klog.ErrorS(err, "Failed to flush fuse connection", "chanId", chanId, "mountpoint", target) + return + } + + cmd, err = h.runCmd(req, fuseFd) + if err != nil { + return + } } + }() + defer func() { close(ossfsExited) }() @@ -130,14 +183,14 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // terminate ossfs process when timeout terr := cmd.Process.Signal(syscall.SIGTERM) if terr != nil { - klog.ErrorS(err, "Failed to terminate ossfs", "pid", pid) + klog.ErrorS(err, "Failed to terminate ossfs", "pid", cmd.Process.Pid) } select { case <-ossfsExited: case <-time.After(time.Second * 2): kerr := cmd.Process.Kill() if kerr != nil && errors.Is(kerr, os.ErrProcessDone) { - klog.ErrorS(err, "Failed to kill ossfs", "pid", pid) + klog.ErrorS(err, "Failed to kill ossfs", "pid", cmd.Process.Pid) } } } diff --git a/pkg/mounter/proxy/server/ossfs2/driver.go b/pkg/mounter/proxy/server/ossfs2/driver.go index 084c7da85..235cba86c 100644 --- a/pkg/mounter/proxy/server/ossfs2/driver.go +++ b/pkg/mounter/proxy/server/ossfs2/driver.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "sync" "syscall" "time" @@ -14,6 +15,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" + "github.com/moby/sys/mountinfo" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "k8s.io/mount-utils" @@ -44,8 +46,38 @@ func (h *Driver) Fstypes() []string { return []string{"ossfs2"} } -func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { - options := req.Options +func (h *Driver) runCmd(req *proxy.MountRequest, passwdFile string, fuseFd int) (cmd *exec.Cmd, err error) { + // 1. use /dev/fd/3 as target for ossfs + args := []string{"mount", "/dev/fd/3"} + // ossfs2.0 forbid to use FUSE args + // args = append(args, req.MountFlags...) + args = append(args, []string{"-c", passwdFile}...) + for _, o := range req.Options { + args = append(args, fmt.Sprintf("--%s", o)) + } + args = append(args, "-f") + + cmd = exec.Command("ossfs2", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + // 2. pase fd on /dev/fuse by fuseFd + // Notes: Mountproxy client (csi-plugin) opens a fd on /dev/fuse, + // client passes it by unix conn. + // Mountproxy server (ossfs pod) receives it by unix conn, + // server mounts with ossfs with /dev/fd/3 (on /dev/fuse) as target. + cmd.ExtraFiles = []*os.File{os.NewFile(uintptr(fuseFd), "/dev/fuse")} + + err = cmd.Start() + if err != nil { + err = fmt.Errorf("start ossfs2 failed: %w", err) + return + } + klog.InfoS("Started ossfs2", "pid", cmd.Process.Pid, "args", cmd.Args) + return +} + +func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest, fuseFd int) error { + target := req.Target // prepare passwd file var passwdFile string @@ -62,45 +94,65 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { klog.V(4).InfoS("created ossfs2 configuration file", "path", passwdFile) } - args := []string{"mount", req.Target} - // ossfs2.0 forbid to use FUSE args - // args = append(args, req.MountFlags...) - args = append(args, []string{"-c", passwdFile}...) - for _, o := range options { - args = append(args, fmt.Sprintf("--%s", o)) - } - args = append(args, "-f") - - cmd := exec.Command("ossfs2", args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Start() + cmd, err := h.runCmd(req, passwdFile, fuseFd) if err != nil { return fmt.Errorf("start ossfs2 failed: %w", err) } - target := req.Target - pid := cmd.Process.Pid - klog.InfoS("Started ossfs2", "pid", pid, "args", args) + info, err := mountinfo.GetMounts(func(i *mountinfo.Info) (skip bool, stop bool) { + return i.Mountpoint != target, i.Mountpoint == target + }) + if len(info) == 0 { + return fmt.Errorf("mountpoint %s is not mounted", target) + } + chanId := info[0].Minor ossfsExited := make(chan error, 1) h.wg.Add(1) - h.pids.Store(pid, cmd) go func() { defer h.wg.Done() - defer h.pids.Delete(pid) + // release to avoid unexpected forking + defer syscall.Close(fuseFd) + defer server.CleanPasswdFile(passwdFile, target) - err := cmd.Wait() - if err != nil { - klog.ErrorS(err, "ossfs2 exited with error", "mountpoint", target, "pid", pid) - } else { - klog.InfoS("ossfs2 exited", "mountpoint", target, "pid", pid) - } - ossfsExited <- err - if err := os.Remove(passwdFile); err != nil { - klog.ErrorS(err, "Remove configuration file", "mountpoint", target, "path", passwdFile) + for { + pid := cmd.Process.Pid + h.pids.Store(pid, cmd) + + err = cmd.Wait() + if err != nil { + klog.ErrorS(err, "ossfs2 exited with error", "mountpoint", target, "pid", pid) + } else { + klog.InfoS("ossfs2 exited", "mountpoint", target, "pid", pid) + } + h.pids.Delete(pid) + + hasClosed := server.SafeSend(ossfsExited, err) + if !hasClosed { + return + } + + waitStatus, ok := cmd.ProcessState.Sys().(syscall.WaitStatus) + if ok && waitStatus.Signal() == syscall.SIGTERM { + return + } + + err = fmt.Errorf("Ossfs2 exited, need recovery") + klog.ErrorS(err, "mountpoint", target) + klog.InfoS("Flush fuse connection", "chanId", chanId, "mountpoint", target) + err = os.WriteFile(filepath.Join(server.FuseConnectionsDir, strconv.Itoa(chanId), "flush"), []byte("1"), 0o600) + if err != nil { + klog.ErrorS(err, "Failed to flush fuse connection", "chanId", chanId, "mountpoint", target) + return + } + + cmd, err = h.runCmd(req, passwdFile, fuseFd) + if err != nil { + return + } } + }() + defer func() { close(ossfsExited) }() @@ -134,14 +186,14 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error { // terminate ossfs process when timeout terr := cmd.Process.Signal(syscall.SIGTERM) if terr != nil { - klog.ErrorS(err, "Failed to terminate ossfs2", "pid", pid) + klog.ErrorS(err, "Failed to terminate ossfs2", "pid", cmd.Process.Pid) } select { case <-ossfsExited: case <-time.After(time.Second * 2): kerr := cmd.Process.Kill() if kerr != nil && errors.Is(kerr, os.ErrProcessDone) { - klog.ErrorS(err, "Failed to kill ossfs2", "pid", pid) + klog.ErrorS(err, "Failed to kill ossfs2", "pid", cmd.Process.Pid) } } } diff --git a/pkg/mounter/proxy_mounter.go b/pkg/mounter/proxy_mounter.go index 68780e986..a99af7e90 100644 --- a/pkg/mounter/proxy_mounter.go +++ b/pkg/mounter/proxy_mounter.go @@ -22,7 +22,10 @@ func NewProxyMounter(socketPath string, inner mountutils.Interface) *ProxyMounte } func (m *ProxyMounter) MountWithSecrets(source, target, fstype string, options []string, secrets map[string]string) error { - dclient := client.NewClient(m.socketPath) + dclient, err := client.NewClient(m.socketPath) + if err != nil { + return fmt.Errorf("call mounter daemon: %w", err) + } resp, err := dclient.Mount(&proxy.MountRequest{ Source: source, Target: target, diff --git a/pkg/mounter/utils/fuse_pod_manager.go b/pkg/mounter/utils/fuse_pod_manager.go index f063b81f7..79107ce4e 100644 --- a/pkg/mounter/utils/fuse_pod_manager.go +++ b/pkg/mounter/utils/fuse_pod_manager.go @@ -74,6 +74,7 @@ type FusePodContext struct { VolumeId string FuseType string AuthConfig *AuthConfig + Recovery bool } type FuseMounterType interface { @@ -83,13 +84,15 @@ type FuseMounterType interface { } type FuseContainerConfig struct { - Resources corev1.ResourceRequirements - Image string - ImageTag string - Dbglevel string - Annotations map[string]string - Labels map[string]string - Extra map[string]string + Resources corev1.ResourceRequirements + Image string + // TODO: remove if Recovery feature reaches beta status. + RecoveryImage string + ImageTag string + Dbglevel string + Annotations map[string]string + Labels map[string]string + Extra map[string]string } func ExtractFuseContainerConfig(configmap *corev1.ConfigMap, name string) (config FuseContainerConfig) { diff --git a/pkg/oss/controllerserver.go b/pkg/oss/controllerserver.go index ef88e5523..c39bf62ea 100644 --- a/pkg/oss/controllerserver.go +++ b/pkg/oss/controllerserver.go @@ -151,6 +151,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs VolumeId: req.VolumeId, AuthConfig: authCfg, FuseType: opts.FuseType, + Recovery: opts.Recovery, }, controllerPublishPath, false) if err != nil { return nil, status.Errorf(codes.Internal, "failed to create %s pod: %v", opts.FuseType, err) diff --git a/pkg/oss/utils.go b/pkg/oss/utils.go index 100903815..2d1ea6d4c 100644 --- a/pkg/oss/utils.go +++ b/pkg/oss/utils.go @@ -27,6 +27,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" cnfsv1beta1 "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cnfs/v1beta1" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/features" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/oss" mounter "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" @@ -150,6 +151,8 @@ func parseOptions(volOptions, secrets map[string]string, volCaps []*csi.VolumeCa default: klog.Warning(WrapOssError(ParamError, "the value(%q) of %q is invalid, only support direct and subpath", v, k).Error()) } + case "recovery": + opts.Recovery = true } } for _, c := range volCaps { @@ -165,7 +168,9 @@ func parseOptions(volOptions, secrets map[string]string, volCaps []*csi.VolumeCa if readOnly { opts.ReadOnly = true } - + if features.FunctionalMutableFeatureGate.Enabled(features.EnableOssfsRecovery) { + opts.Recovery = true + } // default fuseType is ossfs if opts.FuseType == "" { opts.FuseType = OssFsType