Skip to content

Commit 2cfc19d

Browse files
committed
refactor(mounter): use ossfs secret and monitor interceptors in ossfs mount proxy servers
1 parent 7072db2 commit 2cfc19d

File tree

11 files changed

+318
-172
lines changed

11 files changed

+318
-172
lines changed

pkg/mounter/cmd_mounter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func getArgs(op *MountOperation) []string {
5151
return mount.MakeMountArgs(op.Source, op.Target, "", op.Options)
5252
case "ossfs2":
5353
args := []string{"mount", op.Target}
54+
args = append(args, op.Args...)
5455
for _, o := range op.Options {
5556
args = append(args, fmt.Sprintf("--%s", o))
5657
}

pkg/mounter/interceptors/alinas_secret.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func NewAlinasSecretInterceptor() mounter.MountInterceptor {
1717
return AlinasSecretInterceptor{}
1818
}
1919

20-
func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation) (*mounter.MountOperation, error) {
20+
func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation, _ error) (*mounter.MountOperation, error) {
2121
if op == nil || op.Secrets == nil {
2222
return op, nil
2323
}
@@ -46,3 +46,7 @@ func makeCredFileContent(secrets map[string]string) []byte {
4646
secrets["akSecret"],
4747
)
4848
}
49+
50+
func (AlinasSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
51+
return nil
52+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package interceptors
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
8+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
9+
)
10+
11+
type Ossfs2SecretInterceptor struct {
12+
passwdFile string
13+
}
14+
15+
var _ mounter.MountInterceptor = Ossfs2SecretInterceptor{}
16+
17+
func NewOssfs2SecretInterceptor() mounter.MountInterceptor {
18+
return Ossfs2SecretInterceptor{}
19+
}
20+
21+
func (i Ossfs2SecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) {
22+
var err error
23+
i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType)
24+
if err != nil {
25+
return req, err
26+
}
27+
if i.passwdFile != "" {
28+
req.Args = append(req.Args, []string{"-c", i.passwdFile}...)
29+
}
30+
return req, nil
31+
}
32+
33+
func (i Ossfs2SecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
34+
if i.passwdFile == "" {
35+
return nil
36+
}
37+
if err := os.Remove(i.passwdFile); err != nil {
38+
return fmt.Errorf("error removing configuration file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile)
39+
}
40+
return nil
41+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package interceptors
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
8+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server"
9+
"k8s.io/mount-utils"
10+
)
11+
12+
type OssfsMonitorInterceptor struct {
13+
raw mount.Interface
14+
monitorManager *server.MountMonitorManager
15+
}
16+
17+
var _ mounter.MountInterceptor = OssfsMonitorInterceptor{}
18+
19+
func NewOssfsMonitorInterceptor() mounter.MountInterceptor {
20+
return OssfsMonitorInterceptor{
21+
raw: mount.NewWithoutSystemd(""),
22+
monitorManager: server.NewMountMonitorManager(),
23+
}
24+
}
25+
26+
func (i OssfsMonitorInterceptor) BeforeMount(req *mounter.MountOperation, err error) (*mounter.MountOperation, error) {
27+
if req.MetricsPath == "" {
28+
return req, nil
29+
}
30+
// Get or create monitor for this target
31+
monitor, found := i.monitorManager.GetMountMonitor(req.Target, req.MetricsPath, i.raw, true)
32+
if monitor == nil {
33+
return req, fmt.Errorf("failed to get mount monitor for %s, stop monitoring mountpoint status", req.Target)
34+
}
35+
if found {
36+
monitor.IncreaseMountRetryCount()
37+
}
38+
if err != nil {
39+
monitor.HandleMountFailureOrExit(err)
40+
}
41+
return req, nil
42+
}
43+
44+
func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
45+
if op.MetricsPath == "" {
46+
return nil
47+
}
48+
monitor, _ := i.monitorManager.GetMountMonitor(op.Target, op.MetricsPath, i.raw, true)
49+
if monitor == nil {
50+
return fmt.Errorf("failed to get mount monitor for %s", op.Target)
51+
}
52+
53+
// Immediate process-exit handling during mount attempt
54+
// Assume the process exits with no error upon receiving SIGTERM,
55+
// and exits with an error in case of unexpected failures.
56+
monitor.HandleMountFailureOrExit(err)
57+
58+
if op.MountResult == nil {
59+
return nil
60+
}
61+
62+
res, ok := op.MountResult.(server.OssfsMountResult)
63+
if !ok {
64+
return errors.New("failed to parse ossfs mount result")
65+
}
66+
67+
go func() {
68+
err := <-res.ExitChan
69+
monitor.HandleMountFailureOrExit(err)
70+
}()
71+
72+
if err != nil {
73+
return nil
74+
}
75+
76+
monitor.HandleMountSuccess(res.PID)
77+
// Start monitoring goroutine (ticker based only)
78+
i.monitorManager.StartMonitoring(op.Target)
79+
return nil
80+
}
Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,41 @@
11
package interceptors
22

33
import (
4+
"fmt"
5+
"os"
6+
47
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
58
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
69
)
710

811
type OssfsSecretInterceptor struct {
9-
credFileKey string
12+
passwdFile string
1013
}
1114

1215
var _ mounter.MountInterceptor = OssfsSecretInterceptor{}
1316

1417
func NewOssfsSecretInterceptor() mounter.MountInterceptor {
15-
return OssfsSecretInterceptor{
16-
credFileKey: "passwd_file",
17-
}
18-
}
19-
20-
func NewOssfs2SecretInterceptor() mounter.MountInterceptor {
21-
return OssfsSecretInterceptor{
22-
credFileKey: "-c",
23-
}
18+
return OssfsSecretInterceptor{}
2419
}
2520

26-
func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation) (*mounter.MountOperation, error) {
27-
filePath, err := utils.SaveOssSecretsToFile(req.Secrets, req.FsType)
21+
func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) {
22+
var err error
23+
i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType)
2824
if err != nil {
2925
return req, err
3026
}
31-
if filePath != "" {
32-
req.Options = append(req.Options, i.credFileKey+"="+filePath)
27+
if i.passwdFile != "" {
28+
req.Options = append(req.Options, "passwd_file="+i.passwdFile)
3329
}
3430
return req, nil
3531
}
32+
33+
func (i OssfsSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
34+
if i.passwdFile == "" {
35+
return nil
36+
}
37+
if err := os.Remove(i.passwdFile); err != nil {
38+
return fmt.Errorf("error removing passwd file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile)
39+
}
40+
return nil
41+
}

pkg/mounter/mounter.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package mounter
22

33
import (
44
"context"
5+
"fmt"
56

7+
"k8s.io/klog/v2"
68
mountutils "k8s.io/mount-utils"
79
)
810

@@ -16,13 +18,17 @@ type MountOperation struct {
1618
Target string
1719
FsType string
1820
Options []string
21+
Args []string
1922
Secrets map[string]string
2023
MetricsPath string
2124
VolumeID string
25+
26+
MountResult any
2227
}
2328

2429
type MountInterceptor interface {
25-
BeforeMount(op *MountOperation) (*MountOperation, error)
30+
BeforeMount(op *MountOperation, err error) (*MountOperation, error)
31+
AfterMount(op *MountOperation, err error) error
2632
}
2733

2834
type MountWorkflow struct {
@@ -32,13 +38,29 @@ type MountWorkflow struct {
3238

3339
var _ Mounter = &MountWorkflow{}
3440

35-
func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) (err error) {
41+
func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) error {
42+
var err error
43+
for i, interceptor := range w.interceptors {
44+
if op, err = interceptor.BeforeMount(op, err); err != nil && i != len(w.interceptors)-1 {
45+
// Log error but continue to the next interceptor, since some interceptors may
46+
// want to handle the error, e.g. the OssMonitorInterceptor.
47+
// Only log for the intermediate interceptors, otherwise the final error will be printed twice.
48+
klog.ErrorS(err, "error executing BeforeMount interceptor")
49+
}
50+
}
51+
if err != nil {
52+
return fmt.Errorf("error executing BeforeMount interceptor: %w", err)
53+
}
54+
55+
err = w.Mounter.ExtendedMount(ctx, op)
3656
for _, interceptor := range w.interceptors {
37-
if op, err = interceptor.BeforeMount(op); err != nil {
38-
return err
57+
if afterErr := interceptor.AfterMount(op, err); afterErr != nil {
58+
// Log error but continue to the next interceptor.
59+
// Do not override the original error from mount operation.
60+
klog.ErrorS(afterErr, "error executing AfterMount interceptor")
3961
}
4062
}
41-
return w.Mounter.ExtendedMount(ctx, op)
63+
return err
4264
}
4365

4466
func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter {

pkg/mounter/proxy/server/metrics.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package server
33
import (
44
"fmt"
55
"os"
6-
"os/exec"
76
"path/filepath"
87
"strconv"
98
"sync"
@@ -125,15 +124,15 @@ func (m *MountMonitor) HandleMountFailureOrExit(err error) {
125124
}
126125

127126
// HandleMountSuccess handles the case when mount operation succeeds
128-
func (m *MountMonitor) HandleMountSuccess(process *exec.Cmd) {
127+
func (m *MountMonitor) HandleMountSuccess(pid int) {
129128
m.mu.Lock()
130129
defer m.mu.Unlock()
131130

132131
// Update metrics for mount success
133132
m.updateMountPointMetrics(&m.retryCount, nil, nil)
134133

134+
m.Pid = pid
135135
m.State = MonitorStateMonitoring
136-
m.Pid = process.Process.Pid
137136

138137
klog.InfoS("Mount succeeded", "target", m.Target, "pid", m.Pid)
139138
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package server
2+
3+
type OssfsMountResult struct {
4+
PID int
5+
ExitChan chan error
6+
}

0 commit comments

Comments
 (0)