Skip to content

Commit 618cfcb

Browse files
author
Power Cloud Robot
authored
Merge pull request #63 from Madhan-SWE/volume_locks
Volume locks added
2 parents aa7a5f7 + 7cb13f1 commit 618cfcb

File tree

3 files changed

+115
-0
lines changed

3 files changed

+115
-0
lines changed

pkg/driver/controller.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
type controllerService struct {
4949
cloud cloud.Cloud
5050
driverOptions *Options
51+
volumeLocks *util.VolumeLocks
5152
}
5253

5354
var (
@@ -71,6 +72,7 @@ func newControllerService(driverOptions *Options) controllerService {
7172
return controllerService{
7273
cloud: c,
7374
driverOptions: driverOptions,
75+
volumeLocks: util.NewVolumeLocks(),
7476
}
7577
}
7678

@@ -81,6 +83,11 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
8183
return nil, status.Error(codes.InvalidArgument, "Volume name not provided")
8284
}
8385

86+
if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
87+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volName)
88+
}
89+
defer d.volumeLocks.Release(volName)
90+
8491
volSizeBytes, err := getVolSizeBytes(req)
8592
if err != nil {
8693
return nil, err
@@ -129,6 +136,11 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol
129136
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
130137
}
131138

139+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
140+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
141+
}
142+
defer d.volumeLocks.Release(volumeID)
143+
132144
if _, err := d.cloud.GetDiskByID(volumeID); err != nil {
133145
if err == cloud.ErrNotFound {
134146
klog.V(4).Info("DeleteVolume: volume not found, returning with success")
@@ -150,6 +162,11 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs
150162
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
151163
}
152164

165+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
166+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
167+
}
168+
defer d.volumeLocks.Release(volumeID)
169+
153170
nodeID := req.GetNodeId()
154171
if len(nodeID) == 0 {
155172
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
@@ -208,6 +225,11 @@ func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req *
208225
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
209226
}
210227

228+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
229+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
230+
}
231+
defer d.volumeLocks.Release(volumeID)
232+
211233
nodeID := req.GetNodeId()
212234
if len(nodeID) == 0 {
213235
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
@@ -294,6 +316,11 @@ func (d *controllerService) ControllerExpandVolume(ctx context.Context, req *csi
294316
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
295317
}
296318

319+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
320+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
321+
}
322+
defer d.volumeLocks.Release(volumeID)
323+
297324
capRange := req.GetCapacityRange()
298325
if capRange == nil {
299326
return nil, status.Error(codes.InvalidArgument, "Capacity range not provided")

pkg/driver/node.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/container-storage-interface/spec/lib/go/csi"
2727
"github.com/ppc64le-cloud/powervs-csi-driver/pkg/cloud"
2828
"github.com/ppc64le-cloud/powervs-csi-driver/pkg/fibrechannel"
29+
"github.com/ppc64le-cloud/powervs-csi-driver/pkg/util"
2930
"google.golang.org/grpc/codes"
3031
"google.golang.org/grpc/status"
3132
"k8s.io/klog/v2"
@@ -64,6 +65,7 @@ type nodeService struct {
6465
mounter Mounter
6566
driverOptions *Options
6667
pvmInstanceId string
68+
volumeLocks *util.VolumeLocks
6769
}
6870

6971
// newNodeService creates a new node service
@@ -85,6 +87,7 @@ func newNodeService(driverOptions *Options) nodeService {
8587
mounter: newNodeMounter(),
8688
driverOptions: driverOptions,
8789
pvmInstanceId: metadata.GetPvmInstanceId(),
90+
volumeLocks: util.NewVolumeLocks(),
8891
}
8992
}
9093

@@ -96,6 +99,11 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
9699
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
97100
}
98101

102+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
103+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
104+
}
105+
defer d.volumeLocks.Release(volumeID)
106+
99107
target := req.GetStagingTargetPath()
100108
if len(target) == 0 {
101109
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
@@ -195,6 +203,11 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
195203
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
196204
}
197205

206+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
207+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
208+
}
209+
defer d.volumeLocks.Release(volumeID)
210+
198211
target := req.GetStagingTargetPath()
199212
if len(target) == 0 {
200213
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
@@ -254,6 +267,11 @@ func (d *nodeService) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
254267
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
255268
}
256269

270+
if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
271+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
272+
}
273+
defer d.volumeLocks.Release(volumeID)
274+
257275
args := []string{"-o", "source", "--noheadings", "--target", req.GetVolumePath()}
258276
output, err := d.mounter.Command("findmnt", args...).Output()
259277
if err != nil {
@@ -294,6 +312,12 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
294312
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
295313
}
296314

315+
// Acquire a lock on the target path instead of volumeID, since we do not want to serialize multiple node publish calls on the same volume.
316+
if acquired := d.volumeLocks.TryAcquire(target); !acquired {
317+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, target)
318+
}
319+
defer d.volumeLocks.Release(target)
320+
297321
volCap := req.GetVolumeCapability()
298322
if volCap == nil {
299323
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
@@ -334,6 +358,12 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
334358
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
335359
}
336360

361+
// Acquire a lock on the target path instead of volumeID, since we do not want to serialize multiple node publish calls on the same volume.
362+
if acquired := d.volumeLocks.TryAcquire(target); !acquired {
363+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, target)
364+
}
365+
defer d.volumeLocks.Release(target)
366+
337367
klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target)
338368
err := d.mounter.Unmount(target)
339369
if err != nil {

pkg/util/volume_lock.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package util
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/apimachinery/pkg/util/sets"
23+
)
24+
25+
const (
26+
VolumeOperationAlreadyExistsFmt = "An operation with the given volume key %s already exists"
27+
)
28+
29+
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
30+
// with an ongoing operation.
31+
type VolumeLocks struct {
32+
locks sets.String
33+
mux sync.Mutex
34+
}
35+
36+
func NewVolumeLocks() *VolumeLocks {
37+
return &VolumeLocks{
38+
locks: sets.NewString(),
39+
}
40+
}
41+
42+
// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
43+
// If another operation is already using volumeID, returns false.
44+
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
45+
vl.mux.Lock()
46+
defer vl.mux.Unlock()
47+
if vl.locks.Has(volumeID) {
48+
return false
49+
}
50+
vl.locks.Insert(volumeID)
51+
return true
52+
}
53+
54+
func (vl *VolumeLocks) Release(volumeID string) {
55+
vl.mux.Lock()
56+
defer vl.mux.Unlock()
57+
vl.locks.Delete(volumeID)
58+
}

0 commit comments

Comments
 (0)