Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 23 additions & 31 deletions pkg/mounter/cmd_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,32 @@ import (
"os/exec"
"time"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
"k8s.io/mount-utils"
)

const timeout = time.Second * 10

type OssCmdMounter struct {
mount.Interface
volumeId string
execPath string
volumeID string
mount.Interface
}

func NewOssCmdMounter(execPath, volumeId string, inner mount.Interface) Mounter {
var _ Mounter = &OssCmdMounter{}

func NewOssCmdMounter(execPath, volumeID string, inner mount.Interface) Mounter {
return &OssCmdMounter{
execPath: execPath,
volumeId: volumeId,
volumeID: volumeID,
Interface: inner,
}
}

func (m *OssCmdMounter) ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error {
func (m *OssCmdMounter) ExtendedMount(_ context.Context, op *MountOperation) error {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(timeout))
defer cancel()

// Parameters in ExtendedMountParams are optional
if params == nil {
params = &ExtendedMountParams{}
}
passwd, err := utils.SaveOssSecretsToFile(params.Secrets, fstype)
if err != nil {
return err
}

args := getArgs(source, target, fstype, passwd, options)
cmd := exec.CommandContext(ctx, m.execPath, args...)
cmd := exec.CommandContext(ctx, m.execPath, getArgs(op)...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

Expand All @@ -51,20 +42,21 @@ func (m *OssCmdMounter) ExtendedMount(source, target, fstype string, options []s
return nil
}

func getArgs(source, target, fstype, passwdFile string, options []string) []string {
if fstype == "ossfs" {
if passwdFile != "" {
options = append(options, "passwd_file="+passwdFile)
}
return mount.MakeMountArgs(source, target, "", options)
func getArgs(op *MountOperation) []string {
if op == nil {
return nil
}
// ossfs2
args := []string{"mount", target}
if passwdFile != "" {
args = append(args, []string{"-c", passwdFile}...)
}
for _, o := range options {
args = append(args, fmt.Sprintf("--%s", o))
switch op.FsType {
case "ossfs":
return mount.MakeMountArgs(op.Source, op.Target, "", op.Options)
case "ossfs2":
args := []string{"mount", op.Target}
args = append(args, op.Args...)
for _, o := range op.Options {
args = append(args, fmt.Sprintf("--%s", o))
}
return args
default:
return nil
}
return args
}
56 changes: 56 additions & 0 deletions pkg/mounter/interceptors/alinas_secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package interceptors

import (
"context"
"fmt"
"os"
"path"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
"k8s.io/klog/v2"
)

var _ mounter.MountInterceptor = AlinasSecretInterceptor

func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
if op == nil || op.Secrets == nil {
return handler(ctx, op)
}

tmpCredFile, err := os.CreateTemp("", op.VolumeID+"-*.credentials")
if err != nil {
return err
}
defer func() {
if err = os.Remove(tmpCredFile.Name()); err != nil && !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to remove temporary alinas credential file", "path", tmpCredFile.Name())
}
}()

credFileContent := makeCredFileContent(op.Secrets)
if _, err = tmpCredFile.Write(credFileContent); err != nil {
return err
}
if err = tmpCredFile.Close(); err != nil {
return err
}

credFilePath := path.Join(os.TempDir(), op.VolumeID+".credentials")
if err = os.Rename(tmpCredFile.Name(), credFilePath); err != nil {
return err
}

klog.V(4).InfoS("Created alinas credential file", "path", credFilePath)
op.Options = append(op.Options, "ram_config_file="+credFilePath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test mounting two APs on the same node with different AK/SKs.


return handler(ctx, op)
}

func makeCredFileContent(secrets map[string]string) []byte {
return fmt.Appendf(
nil,
"[NASCredentials]\naccessKeyID=%s\naccessKeySecret=%s",
secrets["akId"],
secrets["akSecret"],
)
}
65 changes: 65 additions & 0 deletions pkg/mounter/interceptors/ossfs_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package interceptors

import (
"context"
"errors"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server"
"k8s.io/klog/v2"
"k8s.io/mount-utils"
)

var _ mounter.MountInterceptor = OssfsMonitorInterceptor

var (
raw = mount.NewWithoutSystemd("")
monitorManager = server.NewMountMonitorManager()
)

func OssfsMonitorInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
if op == nil || op.MetricsPath == "" {
return handler(ctx, op)
}

// Get or create monitor for this target
monitor, found := monitorManager.GetMountMonitor(op.Target, op.MetricsPath, raw, true)
if monitor == nil {
klog.ErrorS(errors.New("failed to get mount monitor"), "stop monitoring mountpoint status", "mountpoint", op.Target)
return handler(ctx, op)
}
if found {
monitor.IncreaseMountRetryCount()
}

err := handler(ctx, op)

// Immediate process-exit handling during mount attempt
// Assume the process exits with no error upon receiving SIGTERM,
// and exits with an error in case of unexpected failures.
monitor.HandleMountFailureOrExit(err)

if op.MountResult == nil {
return err
}

res, ok := op.MountResult.(server.OssfsMountResult)
if !ok {
klog.ErrorS(errors.New("failed to assert ossfs mount result type"), "skipping monitoring of mountpoint", "mountpoint", op.Target)
return err
}

go func() {
err := <-res.ExitChan
monitor.HandleMountFailureOrExit(err)
}()

if err != nil {
return err
}

monitor.HandleMountSuccess(res.PID)
// Start monitoring goroutine (ticker based only)
monitorManager.StartMonitoring(op.Target)
return nil
}
60 changes: 60 additions & 0 deletions pkg/mounter/interceptors/ossfs_secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package interceptors

import (
"context"
"errors"
"os"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
"k8s.io/klog/v2"
)

var _ mounter.MountInterceptor = OssfsSecretInterceptor

func OssfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
return ossfsSecretInterceptor(ctx, op, handler, "ossfs")
}

func Ossfs2SecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
return ossfsSecretInterceptor(ctx, op, handler, "ossfs2")
}

func ossfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler, fuseType string) error {
passwdFile, err := utils.SaveOssSecretsToFile(op.Secrets, op.FsType)
if err != nil {
return err
}
if passwdFile != "" {
if fuseType == "ossfs" {
op.Args = append(op.Args, "passwd_file="+passwdFile)
} else {
op.Args = append(op.Args, []string{"-c", passwdFile}...)
}
}

if err = handler(ctx, op); err != nil {
return err
}

if passwdFile == "" || op.MountResult == nil {
return nil
}
result, ok := op.MountResult.(*server.OssfsMountResult)
if !ok {
klog.ErrorS(
errors.New("failed to assert ossfs mount result"),
"skipping cleanup of passwd file", "mountpoint", op.Target, "path", passwdFile,
)
return nil
}

go func() {
<-result.ExitChan
if err := os.Remove(passwdFile); err != nil {
klog.ErrorS(err, "failed to cleanup ossfs passwd file", "mountpoint", op.Target, "path", passwdFile)
}
}()
return nil
}
62 changes: 58 additions & 4 deletions pkg/mounter/mounter.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,69 @@
package mounter

import (
"context"

mountutils "k8s.io/mount-utils"
)

type ExtendedMountParams struct {
type Mounter interface {
mountutils.Interface
ExtendedMount(ctx context.Context, op *MountOperation) error
}

type MountOperation struct {
Source string
Target string
FsType string
Options []string
Args []string
Secrets map[string]string
MetricsPath string
VolumeID string

MountResult any
}

type Mounter interface {
mountutils.Interface
ExtendedMount(source, target, fstype string, options []string, params *ExtendedMountParams) error
type MountHandler func(ctx context.Context, op *MountOperation) error

type MountInterceptor func(ctx context.Context, op *MountOperation, handler MountHandler) error

type MountWorkflow struct {
Mounter
chainedHandler MountHandler
}

var _ Mounter = &MountWorkflow{}

func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) error {
return w.chainedHandler(ctx, op)
}

// chainInterceptors creates a chain of interceptors similar to gRPC
func chainInterceptors(interceptors []MountInterceptor, finalHandler MountHandler) MountHandler {
if len(interceptors) == 0 {
return finalHandler
}

return func(ctx context.Context, op *MountOperation) error {
return interceptors[0](ctx, op, getChainHandler(interceptors, 0, finalHandler))
}
}

// getChainHandler creates a handler that chains interceptors recursively
func getChainHandler(interceptors []MountInterceptor, curr int, finalHandler MountHandler) MountHandler {
if curr == len(interceptors)-1 {
return finalHandler
}

return func(ctx context.Context, op *MountOperation) error {
return interceptors[curr+1](ctx, op, getChainHandler(interceptors, curr+1, finalHandler))
}
}

func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter {
return &MountWorkflow{
Mounter: m,
chainedHandler: chainInterceptors(interceptors, m.ExtendedMount),
}
}
1 change: 1 addition & 0 deletions pkg/mounter/proxy/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ type MountRequest struct {
MountFlags []string `json:"mountFlags,omitempty"`
Secrets map[string]string `json:"secrets,omitempty"`
MetricsPath string `json:"metricsPath,omitempty"`
VolumeID string `json:"volumeID,omitempty"`
}
Loading
Loading