Skip to content

Commit 1296f78

Browse files
authored
Merge pull request #321 from harshika-kashyap/concurrent_changes
fix: Handle concurrent operation of NodeUnstage with NodeStage and NodeUnpublish with NodePublish
2 parents f34a7eb + b5bf4d9 commit 1296f78

File tree

4 files changed

+162
-0
lines changed

4 files changed

+162
-0
lines changed

pkg/blob/blob.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ type Driver struct {
9494
cloud *azure.Cloud
9595
mounter *mount.SafeFormatAndMount
9696
volLockMap *util.LockMap
97+
// A map storing all volumes with ongoing operations so that additional operations
98+
// for that same volume (as defined by VolumeID) return an Aborted error
99+
volumeLocks *volumeLocks
97100
}
98101

99102
// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
@@ -104,6 +107,7 @@ func NewDriver(nodeID string) *Driver {
104107
driver.Version = driverVersion
105108
driver.NodeID = nodeID
106109
driver.volLockMap = util.NewLockMap()
110+
driver.volumeLocks = newVolumeLocks()
107111
return &driver
108112
}
109113

pkg/blob/nodeserver.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
6060
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
6161
}
6262

63+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
64+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
65+
}
66+
defer d.volumeLocks.Release(volumeID)
67+
6368
mountOptions := []string{"bind"}
6469
if req.GetReadonly() {
6570
mountOptions = append(mountOptions, "ro")
@@ -97,6 +102,11 @@ func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublish
97102
targetPath := req.GetTargetPath()
98103
volumeID := req.GetVolumeId()
99104

105+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
106+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
107+
}
108+
defer d.volumeLocks.Release(volumeID)
109+
100110
klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
101111
err := mount.CleanupMountPoint(targetPath, d.mounter, false)
102112
if err != nil {
@@ -122,6 +132,11 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
122132
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
123133
}
124134

135+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
136+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
137+
}
138+
defer d.volumeLocks.Release(volumeID)
139+
125140
mnt, err := d.ensureMountPoint(targetPath)
126141
if err != nil {
127142
return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
@@ -239,6 +254,11 @@ func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolu
239254
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
240255
}
241256

257+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
258+
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
259+
}
260+
defer d.volumeLocks.Release(volumeID)
261+
242262
klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
243263
err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, false)
244264
if err != nil {

pkg/blob/nodeserver_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,10 @@ func TestNodePublishVolume(t *testing.T) {
143143
}
144144
tests := []struct {
145145
desc string
146+
setup func(*Driver)
146147
req csi.NodePublishVolumeRequest
147148
expectedErr error
149+
cleanup func(*Driver)
148150
}{
149151
{
150152
desc: "Volume capabilities missing",
@@ -169,6 +171,21 @@ func TestNodePublishVolume(t *testing.T) {
169171
StagingTargetPath: sourceTest},
170172
expectedErr: status.Error(codes.InvalidArgument, "Target path not provided"),
171173
},
174+
{
175+
desc: "[Error] Volume operation in progress",
176+
setup: func(d *Driver) {
177+
d.volumeLocks.TryAcquire("vol_1")
178+
},
179+
req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
180+
VolumeId: "vol_1",
181+
TargetPath: targetTest,
182+
StagingTargetPath: sourceTest,
183+
Readonly: true},
184+
expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
185+
cleanup: func(d *Driver) {
186+
d.volumeLocks.Release("vol_1")
187+
},
188+
},
172189
{
173190
desc: "Valid request read only",
174191
req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
@@ -210,11 +227,17 @@ func TestNodePublishVolume(t *testing.T) {
210227
}
211228

212229
for _, test := range tests {
230+
if test.setup != nil {
231+
test.setup(d)
232+
}
213233
_, err := d.NodePublishVolume(context.Background(), &test.req)
214234

215235
if !reflect.DeepEqual(err, test.expectedErr) {
216236
t.Errorf("Desc: %s - Unexpected error: %v - Expected: %v", test.desc, err, test.expectedErr)
217237
}
238+
if test.cleanup != nil {
239+
test.cleanup(d)
240+
}
218241
}
219242

220243
// Clean up
@@ -230,8 +253,10 @@ func TestNodePublishVolume(t *testing.T) {
230253
func TestNodeUnpublishVolume(t *testing.T) {
231254
tests := []struct {
232255
desc string
256+
setup func(*Driver)
233257
req csi.NodeUnpublishVolumeRequest
234258
expectedErr error
259+
cleanup func(*Driver)
235260
}{
236261
{
237262
desc: "Volume ID missing",
@@ -243,6 +268,17 @@ func TestNodeUnpublishVolume(t *testing.T) {
243268
req: csi.NodeUnpublishVolumeRequest{VolumeId: "vol_1"},
244269
expectedErr: status.Error(codes.InvalidArgument, "Target path missing in request"),
245270
},
271+
{
272+
desc: "[Error] Volume operation in progress",
273+
setup: func(d *Driver) {
274+
d.volumeLocks.TryAcquire("vol_1")
275+
},
276+
req: csi.NodeUnpublishVolumeRequest{TargetPath: targetTest, VolumeId: "vol_1"},
277+
expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
278+
cleanup: func(d *Driver) {
279+
d.volumeLocks.Release("vol_1")
280+
},
281+
},
246282
{
247283
desc: "Valid request",
248284
req: csi.NodeUnpublishVolumeRequest{TargetPath: "./abc.go", VolumeId: "vol_1"},
@@ -263,11 +299,17 @@ func TestNodeUnpublishVolume(t *testing.T) {
263299
}
264300

265301
for _, test := range tests {
302+
if test.setup != nil {
303+
test.setup(d)
304+
}
266305
_, err := d.NodeUnpublishVolume(context.Background(), &test.req)
267306

268307
if !reflect.DeepEqual(err, test.expectedErr) {
269308
t.Errorf("Unexpected error: %v", err)
270309
}
310+
if test.cleanup != nil {
311+
test.cleanup(d)
312+
}
271313
}
272314

273315
//Clean up
@@ -280,6 +322,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
280322
}
281323

282324
func TestNodeStageVolume(t *testing.T) {
325+
volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}
283326
testCases := []struct {
284327
name string
285328
testFunc func(t *testing.T)
@@ -325,13 +368,32 @@ func TestNodeStageVolume(t *testing.T) {
325368
}
326369
},
327370
},
371+
{
372+
name: "Volume operation in progress",
373+
testFunc: func(t *testing.T) {
374+
req := &csi.NodeStageVolumeRequest{
375+
VolumeId: "unit-test",
376+
StagingTargetPath: "unit-test",
377+
VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
378+
}
379+
d := NewFakeDriver()
380+
d.volumeLocks.TryAcquire("unit-test")
381+
defer d.volumeLocks.Release("unit-test")
382+
_, err := d.NodeStageVolume(context.TODO(), req)
383+
expectedErr := status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "unit-test"))
384+
if !reflect.DeepEqual(err, expectedErr) {
385+
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
386+
}
387+
},
388+
},
328389
}
329390
for _, tc := range testCases {
330391
t.Run(tc.name, tc.testFunc)
331392
}
332393
}
333394

334395
func TestNodeUnstageVolume(t *testing.T) {
396+
volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}
335397
testCases := []struct {
336398
name string
337399
testFunc func(t *testing.T)
@@ -362,6 +424,24 @@ func TestNodeUnstageVolume(t *testing.T) {
362424
}
363425
},
364426
},
427+
{
428+
name: "Volume operation in progress",
429+
testFunc: func(t *testing.T) {
430+
req := &csi.NodeStageVolumeRequest{
431+
VolumeId: "unit-test",
432+
StagingTargetPath: "unit-test",
433+
VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
434+
}
435+
d := NewFakeDriver()
436+
d.volumeLocks.TryAcquire("unit-test")
437+
defer d.volumeLocks.Release("unit-test")
438+
_, err := d.NodeStageVolume(context.TODO(), req)
439+
expectedErr := status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "unit-test"))
440+
if !reflect.DeepEqual(err, expectedErr) {
441+
t.Errorf("actualErr: (%v), expectedErr: (%v)", err, expectedErr)
442+
}
443+
},
444+
},
365445
{
366446
name: "mount point not exist ",
367447
testFunc: func(t *testing.T) {

pkg/blob/volume_lock.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package blob
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/apimachinery/pkg/util/sets"
23+
)
24+
25+
const (
26+
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
27+
)
28+
29+
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
30+
// with an ongoing operation.
31+
type volumeLocks struct {
32+
locks sets.String
33+
mux sync.Mutex
34+
}
35+
36+
func newVolumeLocks() *volumeLocks {
37+
return &volumeLocks{
38+
locks: sets.NewString(),
39+
}
40+
}
41+
42+
// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
43+
// If another operation is already using volumeID, returns false.
44+
func (vl *volumeLocks) TryAcquire(volumeID string) bool {
45+
vl.mux.Lock()
46+
defer vl.mux.Unlock()
47+
if vl.locks.Has(volumeID) {
48+
return false
49+
}
50+
vl.locks.Insert(volumeID)
51+
return true
52+
}
53+
54+
func (vl *volumeLocks) Release(volumeID string) {
55+
vl.mux.Lock()
56+
defer vl.mux.Unlock()
57+
vl.locks.Delete(volumeID)
58+
}

0 commit comments

Comments
 (0)