Skip to content

Commit afc1271

Browse files
committed
Add new option in nodeserver.go to mounting using blobfuse proxy
1 parent 7241cc7 commit afc1271

File tree

11 files changed

+116
-23
lines changed

11 files changed

+116
-23
lines changed

Makefile

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ BUILD_DATE ?= $(shell date -u +"%Y-%m-%dT%H:%M:%SZ")
3131
LDFLAGS ?= "-X ${PKG}/pkg/blob.driverVersion=${IMAGE_VERSION} -X ${PKG}/pkg/blob.gitCommit=${GIT_COMMIT} -X ${PKG}/pkg/blob.buildDate=${BUILD_DATE} -s -w -extldflags '-static'"
3232
E2E_HELM_OPTIONS ?= --set image.blob.pullPolicy=Always --set image.blob.repository=$(REGISTRY)/$(IMAGE_NAME) --set image.blob.tag=$(IMAGE_VERSION)
3333
ifdef ENABLE_BLOBFUSE_PROXY
34-
override E2E_HELM_OPTIONS := --set image.blob.pullPolicy=Always --set image.blob.repository=$(REGISTRY)/$(IMAGE_NAME) --set image.blob.tag=$(IMAGE_VERSION) --set controller.logLevel=6 --set node.logLevel=6
34+
override E2E_HELM_OPTIONS := $(E2E_HELM_OPTIONS) --set controller.logLevel=6 --set node.logLevel=6 --set node.enableBlobfuseProxy=true
3535
endif
3636
GINKGO_FLAGS = -ginkgo.v
3737
GO111MODULE = on
@@ -66,6 +66,9 @@ e2e-test:
6666
e2e-bootstrap: install-helm install-blobfuse-proxy
6767
# Only build and push the image if it does not exist in the registry
6868
docker pull $(IMAGE_TAG) || make blob-container push
69+
if [[ -z "$(ENABLE_BLOBFUSE_PROXY)" ]]; then \
70+
make install-blobfuse-proxy;\
71+
fi
6972
helm install blob-csi-driver ./charts/latest/blob-csi-driver --namespace kube-system --wait --timeout=15m -v=5 --debug \
7073
--set controller.runOnMaster=true \
7174
--set controller.replicas=1 \
@@ -168,11 +171,7 @@ blobfuse-proxy-container:
168171

169172
.PHONY: install-blobfuse-proxy
170173
install-blobfuse-proxy:
171-
ifdef ENABLE_BLOBFUSE_PROXY
172174
kubectl apply -f ./deploy/blobfuse-proxy/blobfuse-proxy.yaml
173-
else
174-
echo "ENABLE_BLOBFUSE_PROXY env not found"
175-
endif
176175

177176
.PHONY: uninstall-blobfuse-proxy
178177
uninstall-blobfuse-proxy:
63 Bytes
Binary file not shown.

charts/latest/blob-csi-driver/templates/csi-blob-node.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ spec:
8585
args:
8686
- "--v={{ .Values.node.logLevel }}"
8787
- "--endpoint=$(CSI_ENDPOINT)"
88+
- "--blobfuse-proxy-endpoint=$(BLOBFUSE_PROXY_ENDPOINT)"
89+
- "--enable-blobfuse-proxy={{ .Values.node.enableBlobfuseProxy }}"
8890
- "--nodeid=$(KUBE_NODE_NAME)"
8991
- "--metrics-address=0.0.0.0:{{ .Values.node.metricsPort }}"
9092
ports:
@@ -111,6 +113,8 @@ spec:
111113
optional: true
112114
- name: CSI_ENDPOINT
113115
value: unix:///csi/csi.sock
116+
- name: BLOBFUSE_PROXY_ENDPOINT
117+
value: unix:///csi/blobfuse-proxy.sock
114118
- name: KUBE_NODE_NAME
115119
valueFrom:
116120
fieldRef:

charts/latest/blob-csi-driver/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ controller:
4040
node:
4141
metricsPort: 29635
4242
logLevel: 5
43+
enableBlobfuseProxy: false
4344

4445
kubelet:
4546
linuxPath: /var/lib/kubelet

deploy/csi-blob-node.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ spec:
8181
args:
8282
- "--v=5"
8383
- "--endpoint=$(CSI_ENDPOINT)"
84+
- "--blobfuse-proxy-endpoint=$(BLOBFUSE_PROXY_ENDPOINT)"
8485
- "--nodeid=$(KUBE_NODE_NAME)"
8586
- "--metrics-address=0.0.0.0:29635"
8687
ports:
@@ -107,6 +108,8 @@ spec:
107108
optional: true
108109
- name: CSI_ENDPOINT
109110
value: unix:///csi/csi.sock
111+
- name: BLOBFUSE_PROXY_ENDPOINT
112+
value: unix:///csi/blobfuse-proxy.sock
110113
- name: KUBE_NODE_NAME
111114
valueFrom:
112115
fieldRef:

pkg/blob/blob.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,23 +93,29 @@ var (
9393
// Driver implements all interfaces of CSI drivers
9494
type Driver struct {
9595
csicommon.CSIDriver
96-
cloud *azure.Cloud
97-
mounter *mount.SafeFormatAndMount
98-
volLockMap *util.LockMap
96+
cloud *azure.Cloud
97+
blobfuseProxyEndpoint string
98+
enableBlobfuseProxy bool
99+
blobfuseProxyConnTimout int
100+
mounter *mount.SafeFormatAndMount
101+
volLockMap *util.LockMap
99102
// A map storing all volumes with ongoing operations so that additional operations
100103
// for that same volume (as defined by VolumeID) return an Aborted error
101104
volumeLocks *volumeLocks
102105
}
103106

104107
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
105108
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
106-
func NewDriver(nodeID string) *Driver {
109+
func NewDriver(nodeID, blobfuseProxyEndpoint string, enableBlobfuseProxy bool, blobfuseProxyConnTimout int) *Driver {
107110
driver := Driver{}
108111
driver.Name = DriverName
109112
driver.Version = driverVersion
110113
driver.NodeID = nodeID
111114
driver.volLockMap = util.NewLockMap()
112115
driver.volumeLocks = newVolumeLocks()
116+
driver.blobfuseProxyEndpoint = blobfuseProxyEndpoint
117+
driver.enableBlobfuseProxy = enableBlobfuseProxy
118+
driver.blobfuseProxyConnTimout = blobfuseProxyConnTimout
113119
return &driver
114120
}
115121

pkg/blob/blob_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@ const (
4444
)
4545

4646
func NewFakeDriver() *Driver {
47-
driver := NewDriver(fakeNodeID)
47+
driver := NewDriver(fakeNodeID, "", false, 5)
4848
driver.Name = fakeDriverName
4949
driver.Version = vendorVersion
5050
return driver
5151
}
5252

5353
func TestNewFakeDriver(t *testing.T) {
54-
d := NewDriver(fakeNodeID)
54+
d := NewDriver(fakeNodeID, "", false, 5)
5555
assert.NotNil(t, d)
5656
}
5757

5858
func TestNewDriver(t *testing.T) {
59-
driver := NewDriver(fakeNodeID)
59+
driver := NewDriver(fakeNodeID, "", false, 5)
6060
fakedriver := NewFakeDriver()
6161
fakedriver.Name = DriverName
6262
fakedriver.Version = driverVersion

pkg/blob/nodeserver.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,20 @@ import (
3838
"google.golang.org/grpc/status"
3939

4040
"golang.org/x/net/context"
41+
"google.golang.org/grpc"
42+
mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
4143
)
4244

45+
type MountClient struct {
46+
service mount_azure_blob.MountServiceClient
47+
}
48+
49+
// NewMountClient returns a new mount client
50+
func NewMountClient(cc *grpc.ClientConn) *MountClient {
51+
service := mount_azure_blob.NewMountServiceClient(cc)
52+
return &MountClient{service}
53+
}
54+
4355
// NodePublishVolume mount the volume from staging to target path
4456
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
4557
if req.GetVolumeCapability() == nil {
@@ -91,6 +103,38 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
91103
return &csi.NodePublishVolumeResponse{}, nil
92104
}
93105

106+
func (d *Driver) mountBlobfuseWithProxy(args string, authEnv []string) (string, error) {
107+
klog.V(2).Infof("mouting using blobfuse proxy")
108+
var resp *mount_azure_blob.MountAzureBlobResponse
109+
var output string
110+
connectionTimout := time.Duration(d.blobfuseProxyConnTimout)
111+
ctx, cancel := context.WithTimeout(context.Background(), connectionTimout*time.Second)
112+
defer cancel()
113+
conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
114+
if err == nil {
115+
mountClient := NewMountClient(conn)
116+
mountreq := mount_azure_blob.MountAzureBlobRequest{
117+
MountArgs: args,
118+
AuthEnv: authEnv,
119+
}
120+
klog.V(2).Infof("calling BlobfuseProxy: MountAzureBlob function")
121+
resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
122+
if err != nil {
123+
klog.Error("GRPC call returned with an error:", err)
124+
}
125+
output = resp.GetOutput()
126+
}
127+
return output, err
128+
}
129+
130+
func (d *Driver) mountBlobfuseInsideDriver(args string, authEnv []string) (string, error) {
131+
klog.V(2).Infof("mounting blobfuse inside driver")
132+
cmd := exec.Command("blobfuse", strings.Split(args, " ")...)
133+
cmd.Env = append(os.Environ(), authEnv...)
134+
output, err := cmd.CombinedOutput()
135+
return string(output), err
136+
}
137+
94138
// NodeUnpublishVolume unmount the volume from the target path
95139
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
96140
if len(req.GetVolumeId()) == 0 {
@@ -213,14 +257,17 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
213257

214258
klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v\nmountOptions %v\nargs %v\nserverAddress %v",
215259
targetPath, protocol, volumeID, attrib, mountFlags, mountOptions, args, serverAddress)
216-
cmd := exec.Command("blobfuse", strings.Split(args, " ")...)
217260

218261
authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
219-
cmd.Env = append(os.Environ(), authEnv...)
262+
var output string
263+
if d.enableBlobfuseProxy {
264+
output, err = d.mountBlobfuseWithProxy(args, authEnv)
265+
} else {
266+
output, err = d.mountBlobfuseInsideDriver(args, authEnv)
267+
}
220268

221-
output, err := cmd.CombinedOutput()
222269
if err != nil {
223-
err = fmt.Errorf("Mount failed with error: %v, output: %v", err, string(output))
270+
err = fmt.Errorf("Mount failed with error: %v, output: %v", err, output)
224271
klog.Errorf("%v", err)
225272
notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
226273
if mntErr != nil {

pkg/blob/nodeserver_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"syscall"
2727
"testing"
2828

29+
"google.golang.org/grpc"
2930
"google.golang.org/grpc/codes"
3031
"google.golang.org/grpc/status"
3132

@@ -132,6 +133,16 @@ func TestEnsureMountPoint(t *testing.T) {
132133
assert.NoError(t, err)
133134
}
134135

136+
func TestNewMountClient(t *testing.T) {
137+
conn, _ := grpc.DialContext(context.TODO(), "unix://tmp/proxy.sock", grpc.WithInsecure())
138+
// No need to check for error because the socket is not available and
139+
// will result in following error: failed to build resolver: invalid (non-empty) authority: tmp
140+
client := NewMountClient(conn)
141+
valueType := reflect.TypeOf(client).String()
142+
fmt.Println(valueType)
143+
assert.Equal(t, valueType, "*blob.MountClient")
144+
}
145+
135146
func TestNodePublishVolume(t *testing.T) {
136147
volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}
137148
createDirError := status.Errorf(codes.Internal,
@@ -559,3 +570,21 @@ func TestNodeExpandVolume(t *testing.T) {
559570
t.Errorf("Unexpected error: %v", err)
560571
}
561572
}
573+
574+
func TestMountBlobfuseWithProxy(t *testing.T) {
575+
args := "--tmp-path /tmp"
576+
authEnv := []string{"username=blob", "authkey=blob"}
577+
d := NewFakeDriver()
578+
_, err := d.mountBlobfuseWithProxy(args, authEnv)
579+
// should be context.deadlineExceededError{} error
580+
assert.NotNil(t, err)
581+
}
582+
583+
func TestMountBlobfuseInsideDriver(t *testing.T) {
584+
args := "--tmp-path /tmp"
585+
authEnv := []string{"username=blob", "authkey=blob"}
586+
d := NewFakeDriver()
587+
_, err := d.mountBlobfuseInsideDriver(args, authEnv)
588+
// the error should be of type exec.ExitError
589+
assert.NotNil(t, err)
590+
}

pkg/blobplugin/main.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ func init() {
3636
}
3737

3838
var (
39-
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
40-
nodeID = flag.String("nodeid", "", "node id")
41-
version = flag.Bool("version", false, "Print the version and exit.")
42-
metricsAddress = flag.String("metrics-address", "0.0.0.0:29634", "export the metrics")
43-
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
39+
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
40+
blobfuseProxyEndpoint = flag.String("blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
41+
nodeID = flag.String("nodeid", "", "node id")
42+
version = flag.Bool("version", false, "Print the version and exit.")
43+
metricsAddress = flag.String("metrics-address", "0.0.0.0:29634", "export the metrics")
44+
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
45+
enableBlobfuseProxy = flag.Bool("enable-blobfuse-proxy", false, "Whether supports using Blobfuse proxy for mounts")
46+
blobfuseProxyConnTimout = flag.Int("blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)")
4447
)
4548

4649
func main() {
@@ -61,7 +64,7 @@ func main() {
6164
}
6265

6366
func handle() {
64-
driver := blob.NewDriver(*nodeID)
67+
driver := blob.NewDriver(*nodeID, *blobfuseProxyEndpoint, *enableBlobfuseProxy, *blobfuseProxyConnTimout)
6568
if driver == nil {
6669
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
6770
}

0 commit comments

Comments
 (0)