Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/mount-proxy-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func main() {
data, _ := json.MarshalIndent(req, "", "\t")
fmt.Println(string(data))

dclient := client.NewClient(socketPath)
dclient, err := client.NewClient(socketPath)
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
resp, err := dclient.Mount(&req)
if err != nil {
fmt.Println(err.Error())
Expand Down
10 changes: 10 additions & 0 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ const (
// This configuration only takes effect for newly mounted OSS volumes.
UpdatedOssfsVersion featuregate.Feature = "UpdatedOssfsVersion"

// EnableOssfsRecovery enables OSSFS recovery.
//
// OSSFS recovery is a feature that allows OSSFS to recover from a failure state.
// When EnableOssfsRecovery is enabled, if ossfs exits unexpectedly due to non-SIGTERM reasons during operation,
// the daemon process in the container will be responsible for restarting ossfs without restarting the business
// container that mounted the OSS volume, thus achieving mount point self-healing.
// Note: Mount point self-healing depends on a specially optimized version of FUSE.
EnableOssfsRecovery featuregate.Feature = "EnableOssfsRecovery"

RundCSIProtocol3 featuregate.Feature = "RundCSIProtocol3"

// Enable volume group snapshots.
Expand All @@ -70,6 +79,7 @@ var (

defaultOSSFeatureGate = map[featuregate.Feature]featuregate.FeatureSpec{
UpdatedOssfsVersion: {Default: true, PreRelease: featuregate.Beta},
EnableOssfsRecovery: {Default: false, PreRelease: featuregate.Alpha},
}

defaultNasFeatureGate = map[featuregate.Feature]featuregate.FeatureSpec{
Expand Down
11 changes: 11 additions & 0 deletions pkg/mounter/oss/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ const (
)

func setDefaultImage(fuseType string, m metadata.MetadataProvider, config *mounterutils.FuseContainerConfig) {
// set recovery image
// TODO: remove this after recovery capacity reaches beta status.
if features.FunctionalMutableFeatureGate.Enabled(features.EnableOssfsRecovery) {
switch fuseType {
case OssFsType:
config.RecoveryImage = fmt.Sprintf("csi-%s:%s", fuseType, defaultOssfsRecoveryImageTag)
case OssFs2Type:
config.RecoveryImage = fmt.Sprintf("csi-%s:%s", fuseType, defaultOssfs2RecoveryImageTag)
}
}

// deprecated
if config.Image != "" {
return
Expand Down
1 change: 1 addition & 0 deletions pkg/mounter/oss/oss_fuse_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ type Options struct {
AuthType string `json:"authType"`
FuseType string `json:"fuseType"`
ReadOnly bool `json:"readOnly"`
Recovery bool `json:"recovery"` // Recovery will be enabled if attributes OR featuregate is enabled
}
4 changes: 4 additions & 0 deletions pkg/mounter/oss/ossfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (

var defaultOssfsImageTag = "v1.88.4-80d165c-aliyun"
var defaultOssfsUpdatedImageTag = "v1.91.7.ack.1-570be5f-aliyun"

// TODO: The recovery-enabled image will modify the underlying FUSE-related dynamic libraries.
// After the Recovery capability reaches beta status, these will be unified to the optimized libraries.
var defaultOssfsRecoveryImageTag = "v1.91.7.ack.1-recovery-570be5f-aliyun"
var defaultOssfsDbglevel = utils.DebugLevelError

const (
Expand Down
3 changes: 3 additions & 0 deletions pkg/mounter/oss/ossfs2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
)

var defaultOssfs2ImageTag = "v2.0.1.ack.1-ecb0808-aliyun"
// TODO: The recovery-enabled image will modify the underlying FUSE-related dynamic libraries.
// After the Recovery capability reaches beta status, these will be unified to the optimized libraries.
var defaultOssfs2RecoveryImageTag = "v2.0.1.ack.1-recovery-ecb0808-aliyun"
var defaultOssfs2Dbglevel = utils.DebugLevelInfo

type fuseOssfs2 struct {
Expand Down
57 changes: 52 additions & 5 deletions pkg/mounter/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,35 @@ import (
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy"
"golang.org/x/sys/unix"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
)

const (
// this should be longer than default timeout in server
defaultTimeout = time.Second * 35
FuseMountType = "fuse"
)

type Client interface {
Mount(req *proxy.MountRequest) (*proxy.Response, error)
}

type client struct {
mount.MounterForceUnmounter
timeout time.Duration
socketPath string
}

func NewClient(socketPath string) *client {
return &client{
socketPath: socketPath,
timeout: defaultTimeout,
func NewClient(socketPath string) (*client, error) {
m, ok := mount.New("").(mount.MounterForceUnmounter)
if !ok {
return nil, errors.New("failed to cast mounter to MounterForceUnmounter")
}
return &client{
MounterForceUnmounter: m,
socketPath: socketPath,
timeout: defaultTimeout,
}, nil
}

func (c *client) doRequest(req *proxy.Request) (*proxy.Response, error) {
Expand All @@ -57,7 +65,22 @@ func (c *client) doRequest(req *proxy.Request) (*proxy.Response, error) {
socket := int(connf.Fd())
defer connf.Close()

err = unix.Sendmsg(socket, append(data, proxy.MessageEnd), nil, nil, 0)
// 1. open /dev/fuse as root
fuseFd, err := unix.Open("/dev/fuse", unix.O_RDWR, 0o644)
if err != nil {
return nil, fmt.Errorf("open /dev/fuse: %w", err)
}
defer unix.Close(fuseFd)

// 2. mount FUSE filesystem with Fd
err = c.mountFuseFilesystemWithFd(req, fuseFd)
if err != nil {
return nil, err
}

// 3. send fd with unix conn
oob := unix.UnixRights(fuseFd)
err = unix.Sendmsg(socket, append(data, proxy.MessageEnd), oob, nil, 0)
if err != nil {
return nil, fmt.Errorf("sendmsg: %w", err)
}
Expand Down Expand Up @@ -93,3 +116,27 @@ func (c *client) Mount(req *proxy.MountRequest) (*proxy.Response, error) {
Body: req,
})
}

func (c *client) mountFuseFilesystemWithFd(req *proxy.Request, fd int) error {
mountReq, ok := req.Body.(*proxy.MountRequest)
if !ok {
return errors.New("invalid request body")
}
// 2.1 split FUSE options
// ex: rw,nosuid,nodev,relatime,user_id=0,group_id=0,allow_other
// fuseOptions, daemonOptions := splitFuseOptions(mountReq.Options)
// 2.2 add fd=`fuseFd` option
// fuseOptions = append(fuseOptions, fmt.Sprintf("fd=%v", fd))
fuseOptions, daemonOptions, err := splitFuseOptions(mountReq.Options)
if err != nil {
return err
}
fuseOptions = append(fuseOptions, fmt.Sprintf("fd=%v", fd))
err = c.MountSensitiveWithoutSystemdWithMountFlags(mountReq.Source, mountReq.Target, FuseMountType, fuseOptions, nil, []string{"--internal-only"})
if err != nil {
return fmt.Errorf("failed to mount the fuse filesystem: %w", err)
}
mountReq.Options = daemonOptions
req.Body = mountReq
return nil
}
109 changes: 109 additions & 0 deletions pkg/mounter/proxy/client/fuse_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package client

import (
"fmt"
"os"
"strconv"
"strings"

"k8s.io/apimachinery/pkg/util/sets"
)

const NULLVAL string = "&NULLVAL"

var defaultFuseOptionsMap = map[string]string{
"nodev": NULLVAL,
"nosuid": NULLVAL,
"rootmode": "40000",
// Note: user_id and group_id are the owner of the FUSE filesystem. Together with
// allow_other, they enable other users to access the FUSE filesystem. The uid and
// gid parameters of ossfs are application-level parameters used to simulate file permissions.
"allow_other": NULLVAL,
"default_permissions": NULLVAL,
"user_id": strconv.Itoa(os.Getuid()),
"group_id": strconv.Itoa(os.Getgid()),
}

type FuseOptionType int

const (
SupportedOption FuseOptionType = iota
UnsupportedOption
IgnoredOption
)

// Refer: https://man7.org/linux/man-pages/man8/mount.fuse3.8.html
// Note: Disable dev and suid options for security reasons. Disable rw option
// because it's the default mode, and we only need to set ro to disable write
// access, avoiding conflicts with ro settings from other mount points.
var fuseOptionKeys = map[string]FuseOptionType{
"exec": SupportedOption,
"noexec": SupportedOption,
"atime": SupportedOption,
"noatime": SupportedOption,
"sync": SupportedOption,
"async": SupportedOption,
"dirsync": SupportedOption,
"debug": SupportedOption,
"ro": SupportedOption,
"dev": UnsupportedOption,
"suid": UnsupportedOption,
"auto_unmount": UnsupportedOption,
"rw": IgnoredOption,
}

func splitFuseOptions(options []string) (fuseOptions, daemonOptions []string, err error) {
fuseOptionsMap := make(map[string]string)
optionSet := sets.New(options...)

// inital with default fuse options
for key, val := range defaultFuseOptionsMap {
fuseOptionsMap[key] = val
}

for _, o := range options {
kv := strings.SplitN(o, "=", 2)
// empty kv, invalid
if len(kv) == 0 {
optionSet.Delete(o)
continue
}
key := kv[0]
if key == "" {
optionSet.Delete(o)
continue
}
v, ok := defaultFuseOptionsMap[key]
// if the key is default, compare the value
// if not equal, update the defualt value
if ok {
if v != NULLVAL && len(kv) == 2 && v != kv[1] {
fuseOptionsMap[key] = kv[1]
}
continue
}
// if the key is not for fuse, see as daemon options
t, ok := fuseOptionKeys[key]
if !ok {
continue
}
switch t {
case SupportedOption:
fuseOptionsMap[key] = NULLVAL
case IgnoredOption:
optionSet.Delete(o)
case UnsupportedOption:
return nil, nil, fmt.Errorf("Unsupported option: %s", o)
}
}
daemonOptions = optionSet.UnsortedList()
for k, v := range fuseOptionsMap {
if v == NULLVAL {
fuseOptions = append(fuseOptions, k)
} else {
fuseOptions = append(fuseOptions, fmt.Sprintf("%s=%s", k, v))
}
}

return
}
Loading