Skip to content

Commit 01d99b1

Browse files
authored
Merge pull request #38 from Leaseweb/lock_vol_ops
Add locking to all volume operations
2 parents 08bca87 + 12b4204 commit 01d99b1

File tree

10 files changed

+631
-65
lines changed

10 files changed

+631
-65
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ test-sanity:
5050
setup-external-e2e: test/e2e/e2e.test test/e2e/ginkgo
5151

5252
test/e2e/e2e.test test/e2e/ginkgo:
53-
curl --location https://dl.k8s.io/v1.19.5/kubernetes-test-linux-amd64.tar.gz | \
53+
curl --location https://dl.k8s.io/v1.27.5/kubernetes-test-linux-amd64.tar.gz | \
5454
tar --strip-components=3 -C test/e2e -zxf - kubernetes/test/bin/e2e.test kubernetes/test/bin/ginkgo
5555

5656
.PHONY: test-e2e
5757
test-e2e: setup-external-e2e
58-
bash ./test/e2e/run.sh
58+
bash ./test/e2e/run.sh

deploy/k8s/controller-deployment.yaml

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ metadata:
44
name: cloudstack-csi-controller
55
namespace: kube-system
66
spec:
7-
replicas: 1
7+
replicas: 3
88
strategy:
99
type: RollingUpdate
1010
rollingUpdate:
@@ -40,19 +40,44 @@ spec:
4040
env:
4141
- name: CSI_ENDPOINT
4242
value: unix:///var/lib/csi/sockets/pluginproxy/csi.sock
43+
securityContext:
44+
runAsNonRoot: true
45+
runAsUser: 65532
46+
runAsGroup: 65532
4347
volumeMounts:
4448
- name: socket-dir
4549
mountPath: /var/lib/csi/sockets/pluginproxy/
4650
- name: cloudstack-conf
4751
mountPath: /etc/cloudstack-csi-driver
52+
ports:
53+
- name: healthz
54+
containerPort: 9808
55+
protocol: TCP
56+
livenessProbe:
57+
httpGet:
58+
path: /healthz
59+
port: healthz
60+
initialDelaySeconds: 30
61+
timeoutSeconds: 10
62+
periodSeconds: 180
63+
failureThreshold: 3
4864

4965
- name: external-provisioner
50-
image: registry.k8s.io/sig-storage/csi-provisioner:v3.3.1
66+
image: registry.k8s.io/sig-storage/csi-provisioner:v3.5.0
5167
imagePullPolicy: IfNotPresent
5268
args:
69+
- "--v=4"
70+
- "--timeout=300s"
5371
- "--csi-address=$(ADDRESS)"
54-
- "--v=5"
72+
- "--kube-api-qps=100"
73+
- "--kube-api-burst=100"
74+
- "--leader-election"
75+
- "--leader-election-lease-duration=120s"
76+
- "--leader-election-renew-deadline=60s"
77+
- "--leader-election-retry-period=30s"
78+
- "--default-fstype=ext4"
5579
- "--feature-gates=Topology=true"
80+
- "--strict-topology"
5681
env:
5782
- name: ADDRESS
5883
value: /var/lib/csi/sockets/pluginproxy/csi.sock
@@ -64,8 +89,27 @@ spec:
6489
image: registry.k8s.io/sig-storage/csi-attacher:v4.3.0
6590
imagePullPolicy: IfNotPresent
6691
args:
92+
- "--v=4"
93+
- "--timeout=300s"
94+
- "--csi-address=$(ADDRESS)"
95+
- "--leader-election"
96+
- "--leader-election-lease-duration=120s"
97+
- "--leader-election-renew-deadline=60s"
98+
- "--leader-election-retry-period=30s"
99+
- "--kube-api-qps=100"
100+
- "--kube-api-burst=100"
101+
env:
102+
- name: ADDRESS
103+
value: /var/lib/csi/sockets/pluginproxy/csi.sock
104+
volumeMounts:
105+
- name: socket-dir
106+
mountPath: /var/lib/csi/sockets/pluginproxy/
107+
108+
- name: liveness-probe
109+
image: registry.k8s.io/sig-storage/livenessprobe:v2.10.0
110+
args:
111+
- "--v=4"
67112
- "--csi-address=$(ADDRESS)"
68-
- "--v=5"
69113
env:
70114
- name: ADDRESS
71115
value: /var/lib/csi/sockets/pluginproxy/csi.sock

deploy/k8s/node-daemonset.yaml

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ spec:
4141
fieldPath: spec.nodeName
4242
securityContext:
4343
privileged: true
44+
capabilities:
45+
add: ["SYS_ADMIN"]
46+
allowPrivilegeEscalation: true
4447
volumeMounts:
4548
- name: plugin-dir
4649
mountPath: /csi
@@ -55,6 +58,30 @@ spec:
5558
mountPath: /run/cloud-init/
5659
- name: cloudstack-conf
5760
mountPath: /etc/cloudstack-csi-driver
61+
ports:
62+
- name: healthz
63+
containerPort: 9808
64+
protocol: TCP
65+
livenessProbe:
66+
httpGet:
67+
path: /healthz
68+
port: healthz
69+
initialDelaySeconds: 10
70+
timeoutSeconds: 5
71+
periodSeconds: 5
72+
failureThreshold: 3
73+
74+
- name: liveness-probe
75+
image: registry.k8s.io/sig-storage/livenessprobe:v2.10.0
76+
args:
77+
- "--v=4"
78+
- "--csi-address=$(ADDRESS)"
79+
env:
80+
- name: ADDRESS
81+
value: /csi/csi.sock
82+
volumeMounts:
83+
- name: plugin-dir
84+
mountPath: /csi
5885

5986
- name: node-driver-registrar
6087
image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.8.0
@@ -68,8 +95,6 @@ spec:
6895
value: /csi/csi.sock
6996
- name: DRIVER_REG_SOCK_PATH
7097
value: /var/lib/kubelet/plugins/csi.cloudstack.apache.org/csi.sock
71-
securityContext:
72-
privileged: true
7398
volumeMounts:
7499
- name: plugin-dir
75100
mountPath: /csi

pkg/driver/controller.go

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"context"
55
"fmt"
66
"math/rand"
7-
"sync"
87

98
"github.com/container-storage-interface/spec/lib/go/csi"
9+
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
1010
"google.golang.org/grpc/codes"
1111
"google.golang.org/grpc/status"
1212

@@ -24,15 +24,15 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{
2424

2525
type controllerServer struct {
2626
csi.UnimplementedControllerServer
27-
connector cloud.Interface
28-
locks map[string]*sync.Mutex
27+
connector cloud.Interface
28+
volumeLocks *util.VolumeLocks
2929
}
3030

3131
// NewControllerServer creates a new Controller gRPC server.
3232
func NewControllerServer(connector cloud.Interface) csi.ControllerServer {
3333
return &controllerServer{
34-
connector: connector,
35-
locks: make(map[string]*sync.Mutex),
34+
connector: connector,
35+
volumeLocks: util.NewVolumeLocks(),
3636
}
3737
}
3838

@@ -61,6 +61,12 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
6161
return nil, status.Errorf(codes.InvalidArgument, "Missing parameter %v", DiskOfferingKey)
6262
}
6363

64+
if acquired := cs.volumeLocks.TryAcquire(name); !acquired {
65+
ctxzap.Extract(ctx).Sugar().Errorf(util.VolumeOperationAlreadyExistsFmt, name)
66+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, name)
67+
}
68+
defer cs.volumeLocks.Release(name)
69+
6470
// Check if a volume with that name already exists
6571
if vol, err := cs.connector.GetVolumeByName(ctx, name); err == cloud.ErrNotFound {
6672
// The volume does not exist
@@ -77,6 +83,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
7783
Volume: &csi.Volume{
7884
VolumeId: vol.ID,
7985
CapacityBytes: vol.Size,
86+
VolumeContext: req.GetParameters(),
87+
// ContentSource: req.GetVolumeContentSource(), TODO: snapshot support
8088
AccessibleTopology: []*csi.Topology{
8189
Topology{ZoneID: vol.ZoneID}.ToCSI(),
8290
},
@@ -118,6 +126,13 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
118126
zoneID = t.ZoneID
119127
}
120128

129+
ctxzap.Extract(ctx).Sugar().Infow("Creating new volume",
130+
"name", name,
131+
"size", sizeInGB,
132+
"offering", diskOfferingID,
133+
"zone", zoneID,
134+
)
135+
121136
volID, err := cs.connector.CreateVolume(ctx, diskOfferingID, zoneID, name, sizeInGB)
122137
if err != nil {
123138
return nil, status.Errorf(codes.Internal, "Cannot create volume %s: %v", name, err.Error())
@@ -127,6 +142,8 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
127142
Volume: &csi.Volume{
128143
VolumeId: volID,
129144
CapacityBytes: util.GigaBytesToBytes(sizeInGB),
145+
VolumeContext: req.GetParameters(),
146+
// ContentSource: req.GetVolumeContentSource(), TODO: snapshot support
130147
AccessibleTopology: []*csi.Topology{
131148
Topology{ZoneID: zoneID}.ToCSI(),
132149
},
@@ -198,6 +215,17 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
198215
}
199216

200217
volumeID := req.GetVolumeId()
218+
219+
if acquired := cs.volumeLocks.TryAcquire(volumeID); !acquired {
220+
ctxzap.Extract(ctx).Sugar().Errorf(util.VolumeOperationAlreadyExistsFmt, volumeID)
221+
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
222+
}
223+
defer cs.volumeLocks.Release(volumeID)
224+
225+
ctxzap.Extract(ctx).Sugar().Infow("Deleting volume",
226+
"volumeID", volumeID,
227+
)
228+
201229
err := cs.connector.DeleteVolume(ctx, volumeID)
202230
if err != nil && err != cloud.ErrNotFound {
203231
return nil, status.Errorf(codes.Internal, "Cannot delete volume %s: %s", volumeID, err.Error())
@@ -218,15 +246,6 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
218246
}
219247
nodeID := req.GetNodeId()
220248

221-
//Ensure only one node is processing at same time
222-
lock, ok := cs.locks[nodeID]
223-
if !ok {
224-
lock = &sync.Mutex{}
225-
cs.locks[nodeID] = lock
226-
}
227-
lock.Lock()
228-
defer lock.Unlock()
229-
230249
if req.GetReadonly() {
231250
return nil, status.Error(codes.InvalidArgument, "Readonly not possible")
232251
}
@@ -238,6 +257,11 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
238257
return nil, status.Error(codes.InvalidArgument, "Access mode not accepted")
239258
}
240259

260+
ctxzap.Extract(ctx).Sugar().Infow("Initiating attaching volume",
261+
"volumeID", volumeID,
262+
"nodeID", nodeID,
263+
)
264+
241265
// Check volume
242266
vol, err := cs.connector.GetVolumeByID(ctx, volumeID)
243267
if err == cloud.ErrNotFound {
@@ -248,6 +272,11 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
248272
}
249273

250274
if vol.VirtualMachineID != "" && vol.VirtualMachineID != nodeID {
275+
ctxzap.Extract(ctx).Sugar().Errorw("Volume already attached to another node",
276+
"volumeID", volumeID,
277+
"nodeID", nodeID,
278+
"attached nodeID", vol.VirtualMachineID,
279+
)
251280
return nil, status.Error(codes.AlreadyExists, "Volume already assigned to another node")
252281
}
253282

@@ -260,18 +289,32 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
260289

261290
if vol.VirtualMachineID == nodeID {
262291
// volume already attached
263-
292+
ctxzap.Extract(ctx).Sugar().Infow("Volume already attached to node",
293+
"volumeID", volumeID,
294+
"nodeID", nodeID,
295+
"deviceID", vol.DeviceID,
296+
)
264297
publishContext := map[string]string{
265298
deviceIDContextKey: vol.DeviceID,
266299
}
267300
return &csi.ControllerPublishVolumeResponse{PublishContext: publishContext}, nil
268301
}
269302

303+
ctxzap.Extract(ctx).Sugar().Infow("Attaching volume to node",
304+
"volumeID", volumeID,
305+
"nodeID", nodeID,
306+
)
307+
270308
deviceID, err := cs.connector.AttachVolume(ctx, volumeID, nodeID)
271309
if err != nil {
272310
return nil, status.Errorf(codes.Internal, "Cannot attach volume %s: %s", volumeID, err.Error())
273311
}
274312

313+
ctxzap.Extract(ctx).Sugar().Infow("Attached volume to node successfully",
314+
"volumeID", volumeID,
315+
"nodeID", nodeID,
316+
)
317+
275318
publishContext := map[string]string{
276319
deviceIDContextKey: deviceID,
277320
}
@@ -302,17 +345,32 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
302345

303346
// Check VM existence
304347
if _, err := cs.connector.GetVMByID(ctx, nodeID); err == cloud.ErrNotFound {
305-
return nil, status.Errorf(codes.NotFound, "VM %v not found", nodeID)
348+
// volumes cannot be attached to deleted VMs
349+
ctxzap.Extract(ctx).Sugar().Warnw("VM not found, marking ControllerUnpublishVolume successful",
350+
"volumeID", volumeID,
351+
"nodeID", nodeID,
352+
)
353+
return &csi.ControllerUnpublishVolumeResponse{}, nil
306354
} else if err != nil {
307355
// Error with CloudStack
308356
return nil, status.Errorf(codes.Internal, "Error %v", err)
309357
}
310358

359+
ctxzap.Extract(ctx).Sugar().Infow("Detaching volume from node",
360+
"volumeID", volumeID,
361+
"nodeID", nodeID,
362+
)
363+
311364
err := cs.connector.DetachVolume(ctx, volumeID)
312365
if err != nil {
313366
return nil, status.Errorf(codes.Internal, "Cannot detach volume %s: %s", volumeID, err.Error())
314367
}
315368

369+
ctxzap.Extract(ctx).Sugar().Infow("Detached volume from node successfully",
370+
"volumeID", volumeID,
371+
"nodeID", nodeID,
372+
)
373+
316374
return &csi.ControllerUnpublishVolumeResponse{}, nil
317375
}
318376

@@ -334,13 +392,16 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req
334392
return nil, status.Errorf(codes.Internal, "Error %v", err)
335393
}
336394

337-
var confirmed *csi.ValidateVolumeCapabilitiesResponse_Confirmed
338-
if isValidVolumeCapabilities(volCaps) {
339-
confirmed = &csi.ValidateVolumeCapabilitiesResponse_Confirmed{VolumeCapabilities: volCaps}
395+
if !isValidVolumeCapabilities(volCaps) {
396+
return &csi.ValidateVolumeCapabilitiesResponse{Message: "Requested VolumeCapabilities are invalid"}, nil
340397
}
398+
341399
return &csi.ValidateVolumeCapabilitiesResponse{
342-
Confirmed: confirmed,
343-
}, nil
400+
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
401+
VolumeContext: req.GetVolumeContext(),
402+
VolumeCapabilities: volCaps,
403+
Parameters: req.GetParameters(),
404+
}}, nil
344405
}
345406

346407
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {

0 commit comments

Comments
 (0)