Skip to content

Commit 7202992

Browse files
committed
refactor(mounter): reimplement interceptors following gRPC pattern
1 parent 2cfc19d commit 7202992

File tree

9 files changed

+100
-146
lines changed

9 files changed

+100
-146
lines changed
Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package interceptors
22

33
import (
4+
"context"
45
"fmt"
56
"os"
67
"path"
@@ -9,33 +10,29 @@ import (
910
"k8s.io/klog/v2"
1011
)
1112

12-
type AlinasSecretInterceptor struct{}
13+
var _ mounter.MountInterceptor = AlinasSecretInterceptor
1314

14-
var _ mounter.MountInterceptor = AlinasSecretInterceptor{}
15-
16-
func NewAlinasSecretInterceptor() mounter.MountInterceptor {
17-
return AlinasSecretInterceptor{}
18-
}
19-
20-
func (AlinasSecretInterceptor) BeforeMount(op *mounter.MountOperation, _ error) (*mounter.MountOperation, error) {
15+
func AlinasSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
2116
if op == nil || op.Secrets == nil {
22-
return op, nil
17+
return handler(ctx, op)
2318
}
2419

2520
tmpDir, err := os.MkdirTemp("", "alinas-")
2621
if err != nil {
27-
return op, err
22+
return err
2823
}
2924

3025
credFileContent := makeCredFileContent(op.Secrets)
3126
credFilePath := path.Join(tmpDir, op.VolumeID+".credentials")
3227
if err = os.WriteFile(credFilePath, credFileContent, 0o600); err != nil {
33-
return op, err
28+
os.RemoveAll(tmpDir) // cleanup in case of error
29+
return err
3430
}
3531

3632
klog.V(4).InfoS("Created alinas credential file", "path", credFilePath)
3733
op.Options = append(op.Options, "ram_config_file="+credFilePath)
38-
return op, nil
34+
35+
return handler(ctx, op)
3936
}
4037

4138
func makeCredFileContent(secrets map[string]string) []byte {
@@ -46,7 +43,3 @@ func makeCredFileContent(secrets map[string]string) []byte {
4643
secrets["akSecret"],
4744
)
4845
}
49-
50-
func (AlinasSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
51-
return nil
52-
}

pkg/mounter/interceptors/ossfs2_secret.go

Lines changed: 0 additions & 41 deletions
This file was deleted.
Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,52 @@
11
package interceptors
22

33
import (
4+
"context"
45
"errors"
5-
"fmt"
66

77
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
88
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server"
9+
"k8s.io/klog/v2"
910
"k8s.io/mount-utils"
1011
)
1112

12-
type OssfsMonitorInterceptor struct {
13-
raw mount.Interface
14-
monitorManager *server.MountMonitorManager
15-
}
13+
var _ mounter.MountInterceptor = OssfsMonitorInterceptor
1614

17-
var _ mounter.MountInterceptor = OssfsMonitorInterceptor{}
15+
var (
16+
raw = mount.NewWithoutSystemd("")
17+
monitorManager = server.NewMountMonitorManager()
18+
)
1819

19-
func NewOssfsMonitorInterceptor() mounter.MountInterceptor {
20-
return OssfsMonitorInterceptor{
21-
raw: mount.NewWithoutSystemd(""),
22-
monitorManager: server.NewMountMonitorManager(),
20+
func OssfsMonitorInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
21+
if op == nil || op.MetricsPath == "" {
22+
return handler(ctx, op)
2323
}
24-
}
2524

26-
func (i OssfsMonitorInterceptor) BeforeMount(req *mounter.MountOperation, err error) (*mounter.MountOperation, error) {
27-
if req.MetricsPath == "" {
28-
return req, nil
29-
}
3025
// Get or create monitor for this target
31-
monitor, found := i.monitorManager.GetMountMonitor(req.Target, req.MetricsPath, i.raw, true)
26+
monitor, found := monitorManager.GetMountMonitor(op.Target, op.MetricsPath, raw, true)
3227
if monitor == nil {
33-
return req, fmt.Errorf("failed to get mount monitor for %s, stop monitoring mountpoint status", req.Target)
28+
klog.ErrorS(errors.New("failed to get mount monitor"), "stop monitoring mountpoint status", "mountpoint", op.Target)
29+
return handler(ctx, op)
3430
}
3531
if found {
3632
monitor.IncreaseMountRetryCount()
3733
}
38-
if err != nil {
39-
monitor.HandleMountFailureOrExit(err)
40-
}
41-
return req, nil
42-
}
4334

44-
func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
45-
if op.MetricsPath == "" {
46-
return nil
47-
}
48-
monitor, _ := i.monitorManager.GetMountMonitor(op.Target, op.MetricsPath, i.raw, true)
49-
if monitor == nil {
50-
return fmt.Errorf("failed to get mount monitor for %s", op.Target)
51-
}
35+
err := handler(ctx, op)
5236

5337
// Immediate process-exit handling during mount attempt
5438
// Assume the process exits with no error upon receiving SIGTERM,
5539
// and exits with an error in case of unexpected failures.
5640
monitor.HandleMountFailureOrExit(err)
5741

5842
if op.MountResult == nil {
59-
return nil
43+
return err
6044
}
6145

6246
res, ok := op.MountResult.(server.OssfsMountResult)
6347
if !ok {
64-
return errors.New("failed to parse ossfs mount result")
48+
klog.ErrorS(errors.New("failed to assert ossfs mount result type"), "skipping monitoring of mountpoint", "mountpoint", op.Target)
49+
return err
6550
}
6651

6752
go func() {
@@ -70,11 +55,11 @@ func (i OssfsMonitorInterceptor) AfterMount(op *mounter.MountOperation, err erro
7055
}()
7156

7257
if err != nil {
73-
return nil
58+
return err
7459
}
7560

7661
monitor.HandleMountSuccess(res.PID)
7762
// Start monitoring goroutine (ticker based only)
78-
i.monitorManager.StartMonitoring(op.Target)
63+
monitorManager.StartMonitoring(op.Target)
7964
return nil
8065
}
Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,60 @@
11
package interceptors
22

33
import (
4-
"fmt"
4+
"context"
5+
"errors"
56
"os"
67

78
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
9+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy/server"
810
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
11+
"k8s.io/klog/v2"
912
)
1013

11-
type OssfsSecretInterceptor struct {
12-
passwdFile string
13-
}
14+
var _ mounter.MountInterceptor = OssfsSecretInterceptor
1415

15-
var _ mounter.MountInterceptor = OssfsSecretInterceptor{}
16+
func OssfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
17+
return ossfsSecretInterceptor(ctx, op, handler, "ossfs")
18+
}
1619

17-
func NewOssfsSecretInterceptor() mounter.MountInterceptor {
18-
return OssfsSecretInterceptor{}
20+
func Ossfs2SecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler) error {
21+
return ossfsSecretInterceptor(ctx, op, handler, "ossfs2")
1922
}
2023

21-
func (i OssfsSecretInterceptor) BeforeMount(req *mounter.MountOperation, _ error) (*mounter.MountOperation, error) {
22-
var err error
23-
i.passwdFile, err = utils.SaveOssSecretsToFile(req.Secrets, req.FsType)
24+
func ossfsSecretInterceptor(ctx context.Context, op *mounter.MountOperation, handler mounter.MountHandler, fuseType string) error {
25+
passwdFile, err := utils.SaveOssSecretsToFile(op.Secrets, op.FsType)
2426
if err != nil {
25-
return req, err
27+
return err
2628
}
27-
if i.passwdFile != "" {
28-
req.Options = append(req.Options, "passwd_file="+i.passwdFile)
29+
if passwdFile != "" {
30+
if fuseType == "ossfs" {
31+
op.Args = append(op.Args, "passwd_file="+passwdFile)
32+
} else {
33+
op.Args = append(op.Args, []string{"-c", passwdFile}...)
34+
}
2935
}
30-
return req, nil
31-
}
3236

33-
func (i OssfsSecretInterceptor) AfterMount(op *mounter.MountOperation, err error) error {
34-
if i.passwdFile == "" {
37+
if err = handler(ctx, op); err != nil {
38+
return err
39+
}
40+
41+
if passwdFile == "" || op.MountResult == nil {
3542
return nil
3643
}
37-
if err := os.Remove(i.passwdFile); err != nil {
38-
return fmt.Errorf("error removing passwd file: %w, mountpoint=%s, path=%s", err, op.Target, i.passwdFile)
44+
result, ok := op.MountResult.(*server.OssfsMountResult)
45+
if !ok {
46+
klog.ErrorS(
47+
errors.New("failed to assert ossfs mount result"),
48+
"skipping cleanup of passwd file", "mountpoint", op.Target, "path", passwdFile,
49+
)
50+
return nil
3951
}
52+
53+
go func() {
54+
<-result.ExitChan
55+
if err := os.Remove(passwdFile); err != nil {
56+
klog.ErrorS(err, "failed to cleanup ossfs passwd file", "mountpoint", op.Target, "path", passwdFile)
57+
}
58+
}()
4059
return nil
4160
}

pkg/mounter/mounter.go

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package mounter
22

33
import (
44
"context"
5-
"fmt"
65

7-
"k8s.io/klog/v2"
86
mountutils "k8s.io/mount-utils"
97
)
108

@@ -26,46 +24,46 @@ type MountOperation struct {
2624
MountResult any
2725
}
2826

29-
type MountInterceptor interface {
30-
BeforeMount(op *MountOperation, err error) (*MountOperation, error)
31-
AfterMount(op *MountOperation, err error) error
32-
}
27+
type MountHandler func(ctx context.Context, op *MountOperation) error
28+
29+
type MountInterceptor func(ctx context.Context, op *MountOperation, handler MountHandler) error
3330

3431
type MountWorkflow struct {
3532
Mounter
36-
interceptors []MountInterceptor
33+
chainedHandler MountHandler
3734
}
3835

3936
var _ Mounter = &MountWorkflow{}
4037

4138
func (w *MountWorkflow) ExtendedMount(ctx context.Context, op *MountOperation) error {
42-
var err error
43-
for i, interceptor := range w.interceptors {
44-
if op, err = interceptor.BeforeMount(op, err); err != nil && i != len(w.interceptors)-1 {
45-
// Log error but continue to the next interceptor, since some interceptors may
46-
// want to handle the error, e.g. the OssMonitorInterceptor.
47-
// Only log for the intermediate interceptors, otherwise the final error will be printed twice.
48-
klog.ErrorS(err, "error executing BeforeMount interceptor")
49-
}
39+
return w.chainedHandler(ctx, op)
40+
}
41+
42+
// chainInterceptors creates a chain of interceptors similar to gRPC
43+
func chainInterceptors(interceptors []MountInterceptor, finalHandler MountHandler) MountHandler {
44+
if len(interceptors) == 0 {
45+
return finalHandler
5046
}
51-
if err != nil {
52-
return fmt.Errorf("error executing BeforeMount interceptor: %w", err)
47+
48+
return func(ctx context.Context, op *MountOperation) error {
49+
return interceptors[0](ctx, op, getChainHandler(interceptors, 0, finalHandler))
50+
}
51+
}
52+
53+
// getChainHandler creates a handler that chains interceptors recursively
54+
func getChainHandler(interceptors []MountInterceptor, curr int, finalHandler MountHandler) MountHandler {
55+
if curr == len(interceptors)-1 {
56+
return finalHandler
5357
}
5458

55-
err = w.Mounter.ExtendedMount(ctx, op)
56-
for _, interceptor := range w.interceptors {
57-
if afterErr := interceptor.AfterMount(op, err); afterErr != nil {
58-
// Log error but continue to the next interceptor.
59-
// Do not override the original error from mount operation.
60-
klog.ErrorS(afterErr, "error executing AfterMount interceptor")
61-
}
59+
return func(ctx context.Context, op *MountOperation) error {
60+
return interceptors[curr+1](ctx, op, getChainHandler(interceptors, curr+1, finalHandler))
6261
}
63-
return err
6462
}
6563

6664
func NewForMounter(m Mounter, interceptors ...MountInterceptor) Mounter {
6765
return &MountWorkflow{
68-
Mounter: m,
69-
interceptors: interceptors,
66+
Mounter: m,
67+
chainedHandler: chainInterceptors(interceptors, m.ExtendedMount),
7068
}
7169
}

pkg/mounter/proxy/server/alinas/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func init() {
3030
server.RegisterDriver(&Driver{
3131
Mounter: mounter.NewForMounter(
3232
&extendedMounter{Interface: mount.New("")},
33-
interceptors.NewAlinasSecretInterceptor(),
33+
interceptors.AlinasSecretInterceptor,
3434
),
3535
})
3636
}

0 commit comments

Comments
 (0)