44 "context"
55 "fmt"
66 "math/rand"
7+ "sync"
78
89 "github.com/container-storage-interface/spec/lib/go/csi"
910 "google.golang.org/grpc/codes"
@@ -24,12 +25,14 @@ var onlyVolumeCapAccessMode = csi.VolumeCapability_AccessMode{
2425type controllerServer struct {
2526 csi.UnimplementedControllerServer
2627 connector cloud.Interface
28+ locks map [string ]* sync.Mutex
2729}
2830
2931// NewControllerServer creates a new Controller gRPC server.
3032func NewControllerServer (connector cloud.Interface ) csi.ControllerServer {
3133 return & controllerServer {
3234 connector : connector ,
35+ locks : make (map [string ]* sync.Mutex ),
3336 }
3437}
3538
@@ -215,6 +218,15 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
215218 }
216219 nodeID := req .GetNodeId ()
217220
221+ //Ensure only one node is processing at same time
222+ lock , ok := cs .locks [nodeID ]
223+ if ! ok {
224+ lock = & sync.Mutex {}
225+ cs .locks [nodeID ] = lock
226+ }
227+ lock .Lock ()
228+ defer lock .Unlock ()
229+
218230 if req .GetReadonly () {
219231 return nil , status .Error (codes .InvalidArgument , "Readonly not possible" )
220232 }
@@ -236,7 +248,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
236248 }
237249
238250 if vol .VirtualMachineID != "" && vol .VirtualMachineID != nodeID {
239- return nil , status .Error (codes .AlreadyExists , "Volume already assigned" )
251+ return nil , status .Error (codes .AlreadyExists , "Volume already assigned to another node " )
240252 }
241253
242254 if _ , err := cs .connector .GetVMByID (ctx , nodeID ); err == cloud .ErrNotFound {
@@ -274,14 +286,21 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
274286 }
275287 volumeID := req .GetVolumeId ()
276288
289+ // TODO: according to the spec, node_id is allowed to be empty:
290+ //
291+ // If the value is set, the SP MUST unpublish the volume from
292+ // the specified node. If the value is unset, the SP MUST unpublish
293+ // the volume from all nodes it is published to.
277294 if req .GetNodeId () == "" {
278295 return nil , status .Error (codes .InvalidArgument , "Node ID missing in request" )
279296 }
280297 nodeID := req .GetNodeId ()
281298
282299 // Check volume
283300 if vol , err := cs .connector .GetVolumeByID (ctx , volumeID ); err == cloud .ErrNotFound {
284- return nil , status .Errorf (codes .NotFound , "Volume %v not found" , volumeID )
301+ // Volume does not exist in CloudStack. We can safely assume this volume is no longer attached
302+ // The spec requires us to return OK here
303+ return & csi.ControllerUnpublishVolumeResponse {}, nil
285304 } else if err != nil {
286305 // Error with CloudStack
287306 return nil , status .Errorf (codes .Internal , "Error %v" , err )
@@ -291,15 +310,8 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
291310 }
292311
293312 // Check VM existence
294- if _ , err := cs .connector .GetVolumeByID (ctx , volumeID ); err == cloud .ErrNotFound {
295- return nil , status .Errorf (codes .NotFound , "Volume %v not found" , volumeID )
296- } else if err != nil {
297- // Error with CloudStack
298- return nil , status .Errorf (codes .Internal , "Error %v" , err )
299- }
300-
301313 if _ , err := cs .connector .GetVMByID (ctx , nodeID ); err == cloud .ErrNotFound {
302- return nil , status .Errorf (codes .NotFound , "VM %v not found" , volumeID )
314+ return nil , status .Errorf (codes .NotFound , "VM %v not found" , nodeID )
303315 } else if err != nil {
304316 // Error with CloudStack
305317 return nil , status .Errorf (codes .Internal , "Error %v" , err )
0 commit comments