Skip to content

Commit 86e735e

Browse files
committed
feat: use NodeStageVolume
modify PublishVolume add stagingVolume cap test: fix integration test failure fix int test failure fix int test and print more logs in e2e test fix test failure fix int test
1 parent 1ea336b commit 86e735e

File tree

7 files changed

+269
-63
lines changed

7 files changed

+269
-63
lines changed

pkg/blobfuse/blobfuse.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func (d *Driver) Run(endpoint string) {
100100
})
101101

102102
d.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
103+
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
103104
csi.NodeServiceCapability_RPC_UNKNOWN,
104105
})
105106

pkg/blobfuse/nodeserver.go

Lines changed: 141 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525

2626
"github.com/container-storage-interface/spec/lib/go/csi"
2727
"k8s.io/klog"
28-
k8sutil "k8s.io/kubernetes/pkg/volume/util"
28+
"k8s.io/kubernetes/pkg/volume/util"
2929

3030
"google.golang.org/grpc/codes"
3131
"google.golang.org/grpc/status"
@@ -45,7 +45,83 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
4545
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
4646
}
4747

48+
source := req.GetStagingTargetPath()
49+
if len(source) == 0 {
50+
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
51+
}
52+
53+
target := req.GetTargetPath()
54+
if len(target) == 0 {
55+
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
56+
}
57+
58+
mountOptions := getNodePublishMountOptions(req)
59+
60+
if err := d.ensureMountPoint(target); err != nil {
61+
return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
62+
}
63+
64+
readOnly := req.GetReadonly()
65+
volumeID := req.GetVolumeId()
66+
attrib := req.GetVolumeContext()
67+
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
68+
69+
klog.V(2).Infof("target %v\n\nreadonly %v\nvolumeId %v\nContext %v\nmountflags %v\n",
70+
target, readOnly, volumeID, attrib, mountFlags)
71+
72+
klog.V(2).Infof("NodePublishVolume: creating dir %s", target)
73+
if err := d.mounter.MakeDir(target); err != nil {
74+
return nil, status.Errorf(codes.Internal, "Could not create dir %q: %v", target, err)
75+
}
76+
77+
klog.V(2).Infof("NodePublishVolume: mounting %s at %s with mountOptions: %v", source, target, mountOptions)
78+
if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
79+
if removeErr := os.Remove(target); removeErr != nil {
80+
return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
81+
}
82+
return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
83+
}
84+
klog.V(2).Infof("NodePublishVolume: mount %s at %s successfully", source, target)
85+
86+
return &csi.NodePublishVolumeResponse{}, nil
87+
}
88+
89+
// NodeUnpublishVolume unmount the volume from the target path
90+
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
91+
klog.V(2).Infof("NodeUnPublishVolume: called with args %+v", *req)
92+
if len(req.GetVolumeId()) == 0 {
93+
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
94+
}
95+
if len(req.GetTargetPath()) == 0 {
96+
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
97+
}
4898
targetPath := req.GetTargetPath()
99+
volumeID := req.GetVolumeId()
100+
101+
klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
102+
err := d.mounter.Unmount(req.GetTargetPath())
103+
if err != nil {
104+
return nil, status.Error(codes.Internal, err.Error())
105+
}
106+
klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
107+
108+
return &csi.NodeUnpublishVolumeResponse{}, nil
109+
}
110+
111+
// NodeStageVolume mount the volume to a staging path
112+
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
113+
if len(req.GetVolumeId()) == 0 {
114+
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
115+
}
116+
targetPath := req.GetStagingTargetPath()
117+
if len(targetPath) == 0 {
118+
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
119+
}
120+
volumeCapability := req.GetVolumeCapability()
121+
if volumeCapability == nil {
122+
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
123+
}
124+
49125
notMnt, err := d.mounter.IsLikelyNotMountPoint(targetPath)
50126
if err != nil && !os.IsNotExist(err) {
51127
return nil, err
@@ -59,7 +135,7 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
59135
// testing original mount point, make sure the mount link is valid
60136
if _, err := ioutil.ReadDir(targetPath); err == nil {
61137
klog.V(2).Infof("azureFile - already mounted to target %s", targetPath)
62-
return &csi.NodePublishVolumeResponse{}, nil
138+
return &csi.NodeStageVolumeResponse{}, nil
63139
}
64140
// todo: mount link is invalid, now unmount and remount later (built-in functionality)
65141
klog.Warningf("azureFile - ReadDir %s failed with %v, unmount this directory", targetPath, err)
@@ -70,27 +146,23 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
70146
// notMnt = true
71147
}
72148

73-
fsType := req.GetVolumeCapability().GetMount().GetFsType()
74-
75-
readOnly := req.GetReadonly()
76149
volumeID := req.GetVolumeId()
77-
attrib := req.GetVolumeContext()
150+
fsType := req.GetVolumeCapability().GetMount().GetFsType()
78151
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
152+
attrib := req.GetVolumeContext()
79153
secrets := req.GetSecrets()
80154

81155
accountName, accountKey, accountSasToken, containerName, err := d.getStorageAccountAndContainer(ctx, volumeID, attrib, secrets)
82156
if err != nil {
83157
return nil, err
84158
}
85159

160+
// Get mountOptions that the volume will be formatted and mounted with
86161
options := []string{"--use-https=true"}
87-
if readOnly {
88-
options = append(options, "-o ro")
89-
}
90-
mountOptions := k8sutil.JoinMountOptions(mountFlags, options)
162+
mountOptions := util.JoinMountOptions(mountFlags, options)
91163

92-
klog.V(2).Infof("target %v\nfstype %v\n\nreadonly %v\nvolumeId %v\ncontext %v\nmountflags %v\nmountOptions %v\n",
93-
targetPath, fsType, readOnly, volumeID, attrib, mountFlags, mountOptions)
164+
klog.V(2).Infof("target %v\nfstype %v\n\nvolumeId %v\ncontext %v\nmountflags %v\nmountOptions %v\n",
165+
targetPath, fsType, volumeID, attrib, mountFlags, mountOptions)
94166

95167
args := targetPath + " " + "--tmp-path=/mnt/" + volumeID + " " + "--container-name=" + containerName
96168
for _, opt := range mountOptions {
@@ -134,58 +206,28 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
134206
return nil, err
135207
}
136208

137-
return &csi.NodePublishVolumeResponse{}, nil
209+
return &csi.NodeStageVolumeResponse{}, nil
138210
}
139211

140-
// NodeUnpublishVolume unmount the volume from the target path
141-
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
142-
if len(req.GetVolumeId()) == 0 {
143-
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
144-
}
145-
if len(req.GetTargetPath()) == 0 {
146-
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
147-
}
148-
targetPath := req.GetTargetPath()
212+
// NodeUnstageVolume unmount the volume from the staging path
213+
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
214+
klog.V(2).Infof("NodeUnstageVolume: called with args %+v", *req)
149215
volumeID := req.GetVolumeId()
150-
151-
// Unmounting the image
152-
err := d.mounter.Unmount(req.GetTargetPath())
153-
if err != nil {
154-
return nil, status.Error(codes.Internal, err.Error())
216+
if len(volumeID) == 0 {
217+
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
155218
}
156-
klog.V(4).Infof("blobfuse: volume %s/%s has been unmounted.", targetPath, volumeID)
157219

158-
return &csi.NodeUnpublishVolumeResponse{}, nil
159-
}
160-
161-
// NodeStageVolume mount the volume to a staging path
162-
// todo: we may implement this for blobfuse
163-
// The reason that mounting is a two step operation is
164-
// because Kubernetes allows you to use a single volume by multiple pods.
165-
// This is allowed when the storage system supports it or if all pods run on the same node.
166-
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
167-
if len(req.GetVolumeId()) == 0 {
168-
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
169-
}
170-
if len(req.GetStagingTargetPath()) == 0 {
171-
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
220+
target := req.GetStagingTargetPath()
221+
if len(target) == 0 {
222+
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
172223
}
173224

174-
return &csi.NodeStageVolumeResponse{}, nil
175-
}
176-
177-
// NodeUnstageVolume unmount the volume from the staging path
178-
// todo: we may implement this for blobfuse
179-
// The reason that mounting is a two step operation is
180-
// because Kubernetes allows you to use a single volume by multiple pods.
181-
// This is allowed when the storage system supports it or if all pods run on the same node.
182-
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
183-
if len(req.GetVolumeId()) == 0 {
184-
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
185-
}
186-
if len(req.GetStagingTargetPath()) == 0 {
187-
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
225+
klog.V(2).Infof("NodeUnstageVolume: unmounting %s", target)
226+
err := d.mounter.Unmount(target)
227+
if err != nil {
228+
return nil, status.Errorf(codes.Internal, "Could not unmount target %q: %v", target, err)
188229
}
230+
klog.V(2).Infof("NodeUnstageVolume: unmount %s successfully", target)
189231

190232
return &csi.NodeUnstageVolumeResponse{}, nil
191233
}
@@ -217,3 +259,46 @@ func (d *Driver) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeSt
217259
func (d *Driver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
218260
return nil, status.Error(codes.Unimplemented, fmt.Sprintf("NodeExpandVolume is not yet implemented"))
219261
}
262+
263+
func getNodePublishMountOptions(req *csi.NodePublishVolumeRequest) []string {
264+
mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
265+
mountOptions := []string{"bind"}
266+
if req.GetReadonly() {
267+
mountOptions = append(mountOptions, "ro")
268+
}
269+
mountOptions = util.JoinMountOptions(mountFlags, mountOptions)
270+
271+
return mountOptions
272+
}
273+
274+
// ensureMountPoint: ensure mount point to be valid.
275+
// If it is not existed, it will be created.
276+
func (d *Driver) ensureMountPoint(target string) error {
277+
notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
278+
if err != nil && !os.IsNotExist(err) {
279+
return err
280+
}
281+
282+
if !notMnt {
283+
// testing original mount point, make sure the mount link is valid
284+
_, err := ioutil.ReadDir(target)
285+
if err == nil {
286+
klog.V(2).Infof("already mounted to target %s", target)
287+
return nil
288+
}
289+
// mount link is invalid, now unmount and remount later
290+
klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
291+
if err := d.mounter.Unmount(target); err != nil {
292+
klog.Errorf("Unmount directory %s failed with %v", target, err)
293+
return err
294+
}
295+
// notMnt = true
296+
}
297+
298+
if err := os.MkdirAll(target, 0750); err != nil {
299+
klog.Errorf("mkdir failed on target: %s (%v)", target, err)
300+
return err
301+
}
302+
303+
return nil
304+
}

pkg/blobfuse/nodeserver_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package blobfuse
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
"github.com/container-storage-interface/spec/lib/go/csi"
24+
)
25+
26+
func TestGetNodePublishMountOptions(t *testing.T) {
27+
tests := []struct {
28+
request *csi.NodePublishVolumeRequest
29+
expected []string
30+
}{
31+
{
32+
request: &csi.NodePublishVolumeRequest{
33+
VolumeCapability: &csi.VolumeCapability{},
34+
},
35+
expected: []string{"bind"},
36+
},
37+
{
38+
request: &csi.NodePublishVolumeRequest{
39+
VolumeCapability: &csi.VolumeCapability{},
40+
Readonly: true,
41+
},
42+
expected: []string{"bind", "ro"},
43+
},
44+
{
45+
request: &csi.NodePublishVolumeRequest{
46+
VolumeCapability: &csi.VolumeCapability{
47+
AccessType: &csi.VolumeCapability_Mount{
48+
Mount: &csi.VolumeCapability_MountVolume{
49+
MountFlags: []string{"rw"},
50+
},
51+
},
52+
},
53+
},
54+
expected: []string{"bind", "rw"},
55+
},
56+
}
57+
58+
for _, test := range tests {
59+
result := getNodePublishMountOptions(test.request)
60+
if !reflect.DeepEqual(result, test.expected) {
61+
t.Errorf("input: %v, getFStype result: %v, expected: %v", test.request, result, test.expected)
62+
}
63+
}
64+
}

test/e2e/suite_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ var _ = ginkgo.AfterSuite(func() {
117117
projectRoot, err := os.Getwd()
118118
gomega.Expect(err).NotTo(gomega.HaveOccurred())
119119
gomega.Expect(strings.HasSuffix(projectRoot, "blobfuse-csi-driver")).To(gomega.Equal(true))
120+
121+
log.Println("===================blobfuse log===================")
122+
cmdSh := exec.Command("sh", "test/utils/blobfuse_log.sh")
123+
cmdSh.Dir = projectRoot
124+
cmdSh.Stdout = os.Stdout
125+
cmdSh.Stderr = os.Stderr
126+
err = cmdSh.Run()
127+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
128+
log.Println("===================================================")
129+
120130
cmd := exec.Command("make", "e2e-teardown")
121131
cmd.Dir = projectRoot
122132
cmd.Stdout = os.Stdout

test/integration/run-test.sh

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ function cleanup {
2525
readonly volname="citest-$(date +%s)"
2626
readonly volsize="2147483648"
2727
readonly endpoint="$1"
28-
readonly target_path="$2"
29-
readonly resource_group="$3"
30-
readonly cloud="$4"
28+
readonly staging_target_path="$2"
29+
readonly target_path="$3"
30+
readonly resource_group="$4"
31+
readonly cloud="$5"
3132

3233
echo "Begin to run integration test on $cloud..."
3334

@@ -53,13 +54,20 @@ storage_account_name="$(echo "$volumeid" | awk -F# '{print $2}')"
5354
csc controller validate-volume-capabilities --endpoint "$endpoint" --cap 1,block "$volumeid"
5455

5556
if [[ "$cloud" != "AzureChinaCloud" ]]; then
56-
echo "mount volume test:"
57-
csc node publish --endpoint "$endpoint" --cap 1,block --target-path "$target_path" "$volumeid"
57+
echo "stage volume test:"
58+
csc node stage --endpoint "$endpoint" --cap 1,block --staging-target-path "$staging_target_path" "$volumeid"
59+
60+
echo "publish volume test:"
61+
csc node publish --endpoint "$endpoint" --cap 1,block --staging-target-path "$staging_target_path" --target-path "$target_path" "$volumeid"
5862
sleep 2
5963

60-
echo "Unmount volume test:"
64+
echo "unpublish volume test:"
6165
csc node unpublish --endpoint "$endpoint" --target-path "$target_path" "$volumeid"
6266
sleep 2
67+
68+
echo "unstage volume test:"
69+
csc node unstage --endpoint "$endpoint" --staging-target-path "$staging_target_path" "$volumeid"
70+
sleep 2
6371
fi
6472

6573
echo "Delete volume test:"

test/integration/run-tests-all-clouds.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ mkdir -p /usr/blob
2626
cp test/artifacts/blobfuse /usr/bin/blobfuse
2727

2828
apt update && apt install libfuse2 -y
29-
test/integration/run-test.sh "tcp://127.0.0.1:10000" "/tmp/testmount1" "$resource_group" "$cloud"
29+
test/integration/run-test.sh "tcp://127.0.0.1:10000" "/tmp/stagingtargetpath" "/tmp/targetpath" "$resource_group" "$cloud"

0 commit comments

Comments
 (0)