Skip to content

Commit fd79a2d

Browse files
Introduced parallelism for SAN (FCP) Driver in node part of the Trident codebase
1 parent 709f6a0 commit fd79a2d

File tree

4 files changed

+1498
-137
lines changed

4 files changed

+1498
-137
lines changed

frontend/csi/node_server.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ const (
7272
maxNodeStageISCSIVolumeOperations = 5
7373
maxNodeUnstageISCSIVolumeOperations = 10
7474
maxNodePublishISCSIVolumeOperations = 10
75+
maxNodeStageFCPVolumeOperations = 5
76+
maxNodeUnstageFCPVolumeOperations = 10
77+
maxNodePublishFCPVolumeOperations = 10
7578
maxNodeExpandVolumeOperations = 10
7679

7780
NodeStageNFSVolume = "NodeStageNFSVolume"
@@ -86,6 +89,11 @@ const (
8689
NodeUnstageISCSIVolume = "NodeUnstageISCSIVolume"
8790
NodePublishISCSIVolume = "NodePublishISCSIVolume"
8891

92+
// FCP Constants
93+
NodeStageFCPVolume = "NodeStageFCPVolume"
94+
NodeUnstageFCPVolume = "NodeUnstageFCPVolume"
95+
NodePublishFCPVolume = "NodePublishFCPVolume"
96+
8997
NodeUnpublishVolume = "NodeUnpublishVolume"
9098
NodeExpandVolume = "NodeExpandVolume"
9199

@@ -562,8 +570,8 @@ func (p *Plugin) NodeExpandVolume(
562570
}
563571

564572
{
565-
// TODO(pshashan): Remove this POC once the FCP and NVMe protocols are parallelized.
566-
// It currently enables parallelization for the iSCSI protocol while keeping FCP and NVMe serialized.
573+
// TODO(pshashan): Remove this POC once the NVMe protocol is parallelized.
574+
// It currently enables parallelization for the iSCSI and FCP protocol while keeping NVMe serialized.
567575
protocol, err := getVolumeProtocolFromPublishInfo(&trackingInfo.VolumePublishInfo)
568576
if err != nil {
569577
return nil, status.Errorf(codes.FailedPrecondition, "unable to read protocol info from publish info; %s", err)
@@ -572,7 +580,7 @@ func (p *Plugin) NodeExpandVolume(
572580
lockContext := "NodeExpandVolume"
573581
if protocol == tridentconfig.Block {
574582
switch trackingInfo.VolumePublishInfo.SANType {
575-
case sa.NVMe, sa.FCP:
583+
case sa.NVMe:
576584
lockID = nodeLockID
577585
lockContext = "NodeExpandVolume-" + req.GetVolumeId()
578586
}
@@ -1303,16 +1311,14 @@ func (p *Plugin) readAllTrackingFiles(ctx context.Context) []models.VolumePublis
13031311
func (p *Plugin) nodeStageFCPVolume(
13041312
ctx context.Context, req *csi.NodeStageVolumeRequest, publishInfo *models.VolumePublishInfo,
13051313
) (err error) {
1306-
// Serializing all the parallel requests by relying on the constant var.
1307-
lockContext := "NodeStageSANFCPVolume-" + req.GetVolumeId()
1308-
defer locks.Unlock(ctx, lockContext, nodeLockID)
1309-
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
1310-
return status.Error(codes.Aborted, "request waited too long for the lock")
1311-
}
1312-
13131314
Logc(ctx).Debug(">>>> nodeStageFCPVolume")
13141315
defer Logc(ctx).Debug("<<<< nodeStageFCPVolume")
13151316

1317+
if err = p.limiterSharedMap[NodeStageFCPVolume].Wait(ctx); err != nil {
1318+
return err
1319+
}
1320+
defer p.limiterSharedMap[NodeStageFCPVolume].Release(ctx)
1321+
13161322
var lunID int32
13171323
lunID, err = convert.ToPositiveInt32(req.PublishContext["fcpLunNumber"])
13181324
if err != nil {
@@ -1607,12 +1613,13 @@ func (p *Plugin) nodeUnstageFCPVolume(
16071613
func (p *Plugin) nodeUnstageFCPVolumeRetry(
16081614
ctx context.Context, req *csi.NodeUnstageVolumeRequest, publishInfo *models.VolumePublishInfo, force bool,
16091615
) (*csi.NodeUnstageVolumeResponse, error) {
1610-
// Serializing all the parallel requests by relying on the constant var.
1611-
lockContext := "NodeUnstageFCPVolume-" + req.GetVolumeId()
1612-
defer locks.Unlock(ctx, lockContext, nodeLockID)
1613-
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
1614-
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
1616+
Logc(ctx).Debug(">>>> nodeUnstageFCPVolumeRetry")
1617+
defer Logc(ctx).Debug("<<<< nodeUnstageFCPVolumeRetry")
1618+
1619+
if err := p.limiterSharedMap[NodeUnstageFCPVolume].Wait(ctx); err != nil {
1620+
return nil, err
16151621
}
1622+
defer p.limiterSharedMap[NodeUnstageFCPVolume].Release(ctx)
16161623

16171624
nodeUnstageFCPVolumeNotify := func(err error, duration time.Duration) {
16181625
Logc(ctx).WithField("increment", duration).Debug("Failed to unstage the volume, retrying.")
@@ -1641,12 +1648,13 @@ func (p *Plugin) nodeUnstageFCPVolumeRetry(
16411648
func (p *Plugin) nodePublishFCPVolume(
16421649
ctx context.Context, req *csi.NodePublishVolumeRequest,
16431650
) (*csi.NodePublishVolumeResponse, error) {
1644-
// Serializing all the parallel requests by relying on the constant var.
1645-
lockContext := "NodePublishFCPVolume-" + req.GetVolumeId()
1646-
defer locks.Unlock(ctx, lockContext, nodeLockID)
1647-
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
1648-
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
1651+
Logc(ctx).Debug(">>>> nodePublishFCPVolume")
1652+
defer Logc(ctx).Debug("<<<< nodePublishFCPVolume")
1653+
1654+
if err := p.limiterSharedMap[NodePublishFCPVolume].Wait(ctx); err != nil {
1655+
return nil, err
16491656
}
1657+
defer p.limiterSharedMap[NodePublishFCPVolume].Release(ctx)
16501658

16511659
var err error
16521660

@@ -2968,6 +2976,9 @@ func (p *Plugin) nodeStageNVMeVolume(
29682976
ctx context.Context, req *csi.NodeStageVolumeRequest,
29692977
publishInfo *models.VolumePublishInfo,
29702978
) error {
2979+
Logc(ctx).Debug(">>>> nodeStageNVMeVolume")
2980+
defer Logc(ctx).Debug("<<<< nodeStageNVMeVolume")
2981+
29712982
// Serializing all the parallel requests by relying on the constant var.
29722983
lockContext := "NodeStageSANNVMeVolume-" + req.GetVolumeId()
29732984
defer locks.Unlock(ctx, lockContext, nodeLockID)
@@ -3041,6 +3052,9 @@ func (p *Plugin) nodeStageNVMeVolume(
30413052
func (p *Plugin) nodeUnstageNVMeVolume(
30423053
ctx context.Context, req *csi.NodeUnstageVolumeRequest, publishInfo *models.VolumePublishInfo, force bool,
30433054
) (*csi.NodeUnstageVolumeResponse, error) {
3055+
Logc(ctx).Debug(">>>> nodeUnstageNVMeVolume")
3056+
defer Logc(ctx).Debug("<<<< nodeUnstageNVMeVolume")
3057+
30443058
// Serializing all the parallel requests by relying on the constant var.
30453059
lockContext := "NodeUnstageNVMeVolume-" + req.GetVolumeId()
30463060
defer locks.Unlock(ctx, lockContext, nodeLockID)
@@ -3184,6 +3198,9 @@ func (p *Plugin) nodeUnstageNVMeVolume(
31843198
func (p *Plugin) nodePublishNVMeVolume(
31853199
ctx context.Context, req *csi.NodePublishVolumeRequest,
31863200
) (*csi.NodePublishVolumeResponse, error) {
3201+
Logc(ctx).Debug(">>>> nodePublishNVMeVolume")
3202+
defer Logc(ctx).Debug("<<<< nodePublishNVMeVolume")
3203+
31873204
// Serializing all the parallel requests by relying on the constant var.
31883205
lockContext := "NodePublishNVMeVolume-" + req.GetVolumeId()
31893206
defer locks.Unlock(ctx, lockContext, nodeLockID)

0 commit comments

Comments
 (0)