Skip to content

Commit ff3d7cf

Browse files
Introduced parallelism for NAS Driver in node part of the Trident codebase
1 parent a92cb7e commit ff3d7cf

File tree

11 files changed

+943
-134
lines changed

11 files changed

+943
-134
lines changed

frontend/csi/node_server.go

Lines changed: 204 additions & 34 deletions
Large diffs are not rendered by default.

frontend/csi/node_server_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,9 +1047,9 @@ func TestFixISCSISessions(t *testing.T) {
10471047
portals := getPortals(input.PublishedPortals, input.PortalActions)
10481048

10491049
if input.AddNewNodeOps {
1050-
go utils.Lock(ctx, "test-lock1", lockID)
1050+
go utils.Lock(ctx, "test-lock1", nodeLockID)
10511051
snooze(10)
1052-
go utils.Lock(ctx, "test-lock2", lockID)
1052+
go utils.Lock(ctx, "test-lock2", nodeLockID)
10531053
snooze(10)
10541054
}
10551055

@@ -1074,16 +1074,16 @@ func TestFixISCSISessions(t *testing.T) {
10741074
}
10751075

10761076
if input.AddNewNodeOps {
1077-
utils.Unlock(ctx, "test-lock1", lockID)
1077+
utils.Unlock(ctx, "test-lock1", nodeLockID)
10781078

10791079
// Wait for the lock to be released
1080-
for utils.WaitQueueSize(lockID) > 1 {
1080+
for utils.WaitQueueSize(nodeLockID) > 1 {
10811081
snooze(10)
10821082
}
10831083

10841084
// Give some time for another context to acquire the lock
10851085
snooze(100)
1086-
utils.Unlock(ctx, "test-lock2", lockID)
1086+
utils.Unlock(ctx, "test-lock2", nodeLockID)
10871087
}
10881088
})
10891089
}
@@ -1599,23 +1599,23 @@ func TestAttemptLock_Failure(t *testing.T) {
15991599
lockContext := "fakeLockContext-req1"
16001600
lockTimeout := 200 * time.Millisecond
16011601
// first request takes the lock
1602-
expected := attemptLock(ctx, lockContext, lockTimeout)
1602+
expected := attemptLock(ctx, lockContext, nodeLockID, lockTimeout)
16031603

16041604
// start the second request so that it is in race for the lock
16051605
go func() {
16061606
defer wg.Done()
16071607
ctx := context.Background()
16081608
lockContext := "fakeLockContext-req2"
1609-
expected := attemptLock(ctx, lockContext, lockTimeout)
1609+
expected := attemptLock(ctx, lockContext, nodeLockID, lockTimeout)
16101610

16111611
assert.False(t, expected)
1612-
utils.Unlock(ctx, lockContext, lockID)
1612+
utils.Unlock(ctx, lockContext, nodeLockID)
16131613
}()
16141614
// first request goes to sleep holding the lock
16151615
if expected {
16161616
time.Sleep(500 * time.Millisecond)
16171617
}
1618-
utils.Unlock(ctx, lockContext, lockID)
1618+
utils.Unlock(ctx, lockContext, nodeLockID)
16191619
wg.Wait()
16201620
}
16211621

@@ -1632,7 +1632,7 @@ func TestAttemptLock_Success(t *testing.T) {
16321632
lockContext := "fakeLockContext-req1"
16331633
lockTimeout := 500 * time.Millisecond
16341634
// first request takes the lock
1635-
expected := attemptLock(ctx, lockContext, lockTimeout)
1635+
expected := attemptLock(ctx, lockContext, nodeLockID, lockTimeout)
16361636

16371637
// start the second request so that it is in race for the lock
16381638
go func() {
@@ -1641,16 +1641,16 @@ func TestAttemptLock_Success(t *testing.T) {
16411641
lockContext := "fakeLockContext-req2"
16421642
lockTimeout := 5 * time.Second
16431643

1644-
expected := attemptLock(ctx, lockContext, lockTimeout)
1644+
expected := attemptLock(ctx, lockContext, nodeLockID, lockTimeout)
16451645

16461646
assert.True(t, expected)
1647-
utils.Unlock(ctx, lockContext, lockID)
1647+
utils.Unlock(ctx, lockContext, nodeLockID)
16481648
}()
16491649
// first request goes to sleep holding the lock
16501650
if expected {
16511651
time.Sleep(200 * time.Millisecond)
16521652
}
1653-
utils.Unlock(ctx, lockContext, lockID)
1653+
utils.Unlock(ctx, lockContext, nodeLockID)
16541654
wg.Wait()
16551655
}
16561656

frontend/csi/plugin.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/netapp/trident/utils/fcp"
2828
"github.com/netapp/trident/utils/filesystem"
2929
"github.com/netapp/trident/utils/iscsi"
30+
"github.com/netapp/trident/utils/limiter"
3031
"github.com/netapp/trident/utils/models"
3132
"github.com/netapp/trident/utils/mount"
3233
"github.com/netapp/trident/utils/nvme"
@@ -73,6 +74,8 @@ type Plugin struct {
7374

7475
nodeIsRegistered bool
7576

77+
limiterSharedMap map[string]limiter.Limiter
78+
7679
iSCSISelfHealingTicker *time.Ticker
7780
iSCSISelfHealingChannel chan struct{}
7881
iSCSISelfHealingInterval time.Duration
@@ -196,6 +199,7 @@ func NewNodePlugin(
196199
enableForceDetach: enableForceDetach,
197200
unsafeDetach: unsafeDetach,
198201
opCache: sync.Map{},
202+
limiterSharedMap: make(map[string]limiter.Limiter),
199203
iSCSISelfHealingInterval: iSCSISelfHealingInterval,
200204
iSCSISelfHealingWaitTime: iSCSIStaleSessionWaitTime,
201205
nvmeHandler: nvme.NewNVMeHandler(),
@@ -306,6 +310,7 @@ func NewAllInOnePlugin(
306310
controllerHelper: *controllerHelper,
307311
nodeHelper: *nodeHelper,
308312
opCache: sync.Map{},
313+
limiterSharedMap: make(map[string]limiter.Limiter),
309314
iSCSISelfHealingInterval: iSCSISelfHealingInterval,
310315
iSCSISelfHealingWaitTime: iSCSIStaleSessionWaitTime,
311316
nvmeHandler: nvme.NewNVMeHandler(),
@@ -395,6 +400,9 @@ func (p *Plugin) Activate() error {
395400
p.startISCSISelfHealingThread(ctx)
396401
p.startNVMeSelfHealingThread(ctx)
397402

403+
// Initialize node scalability limiter.
404+
p.InitializeNodeLimiter(ctx)
405+
398406
if p.enableForceDetach {
399407
p.startReconcilingNodePublications(ctx)
400408
}
@@ -618,3 +626,67 @@ func (p *Plugin) stopNVMeSelfHealingThread(_ context.Context) {
618626

619627
return
620628
}
629+
630+
// InitializeNodeLimiter initializes the sharedMapLimiter of node with various csi workflow related limiters.
631+
// This function sets up limiters for different volume operations such as staging, unstaging,
632+
// publishing, and unpublishing NFS and SMB volumes. Each limiter is configured with a semaphoreN
633+
// to control the maximum number of concurrent operations allowed for each type of volume operation.
634+
func (p *Plugin) InitializeNodeLimiter(ctx context.Context) {
635+
var err error
636+
637+
if p.limiterSharedMap[NodeStageNFSVolume], err = limiter.New(ctx,
638+
NodeStageNFSVolume,
639+
limiter.TypeSemaphoreN,
640+
limiter.WithSemaphoreNSize(ctx, maxNodeStageNFSVolumeOperations),
641+
); err != nil {
642+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodeStageNFSVolume, err)
643+
}
644+
645+
if p.limiterSharedMap[NodeStageSMBVolume], err = limiter.New(ctx,
646+
NodeStageSMBVolume,
647+
limiter.TypeSemaphoreN,
648+
limiter.WithSemaphoreNSize(ctx, maxNodeStageSMBVolumeOperations),
649+
); err != nil {
650+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodeStageSMBVolume, err)
651+
}
652+
653+
if p.limiterSharedMap[NodeUnstageNFSVolume], err = limiter.New(ctx,
654+
NodeUnstageNFSVolume,
655+
limiter.TypeSemaphoreN,
656+
limiter.WithSemaphoreNSize(ctx, maxNodeUnstageNFSVolumeOperations),
657+
); err != nil {
658+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodeUnstageNFSVolume, err)
659+
}
660+
661+
if p.limiterSharedMap[NodeUnstageSMBVolume], err = limiter.New(ctx,
662+
NodeUnstageSMBVolume,
663+
limiter.TypeSemaphoreN,
664+
limiter.WithSemaphoreNSize(ctx, maxNodeUnstageSMBVolumeOperations),
665+
); err != nil {
666+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodeUnstageSMBVolume, err)
667+
}
668+
669+
if p.limiterSharedMap[NodePublishNFSVolume], err = limiter.New(ctx,
670+
NodePublishNFSVolume,
671+
limiter.TypeSemaphoreN,
672+
limiter.WithSemaphoreNSize(ctx, maxNodePublishNFSVolumeOperations),
673+
); err != nil {
674+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodePublishNFSVolume, err)
675+
}
676+
677+
if p.limiterSharedMap[NodePublishSMBVolume], err = limiter.New(ctx,
678+
NodePublishSMBVolume,
679+
limiter.TypeSemaphoreN,
680+
limiter.WithSemaphoreNSize(ctx, maxNodePublishSMBVolumeOperations),
681+
); err != nil {
682+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodePublishSMBVolume, err)
683+
}
684+
685+
if p.limiterSharedMap[NodeUnpublishVolume], err = limiter.New(ctx,
686+
NodeUnpublishVolume,
687+
limiter.TypeSemaphoreN,
688+
limiter.WithSemaphoreNSize(ctx, maxNodeUnpublishVolumeOperations),
689+
); err != nil {
690+
Logc(ctx).Fatalf("Failed to initialize limiter for %s: %v", NodeUnpublishVolume, err)
691+
}
692+
}

frontend/csi/volume_publish_manager.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ import (
1212
"strings"
1313

1414
"github.com/spf13/afero"
15+
"google.golang.org/grpc/codes"
16+
"google.golang.org/grpc/status"
1517

1618
"github.com/netapp/trident/config"
1719
"github.com/netapp/trident/internal/fiji"
1820
. "github.com/netapp/trident/logging"
21+
"github.com/netapp/trident/utils"
1922
"github.com/netapp/trident/utils/errors"
2023
"github.com/netapp/trident/utils/filesystem"
2124
"github.com/netapp/trident/utils/models"
@@ -95,6 +98,13 @@ func (v *VolumePublishManager) WriteTrackingInfo(
9598
return err
9699
}
97100

101+
// we can have locks on filename itself.
102+
lockContext := "WriteTrackingInfo"
103+
defer utils.Unlock(ctx, lockContext, filename)
104+
if !attemptLock(ctx, lockContext, filename, csiNodeLockTimeout) {
105+
return status.Error(codes.Aborted, "request waited too long for the lock")
106+
}
107+
98108
err := jsonRW.WriteJSONFile(ctx, trackingInfo, tmpFile, "volume tracking info")
99109
if err != nil {
100110
return err
@@ -133,6 +143,11 @@ func (v *VolumePublishManager) readTrackingInfo(
133143
return nil, err
134144
}
135145

146+
lockContext := "ReadTrackingInfo"
147+
defer utils.Unlock(ctx, lockContext, filename)
148+
if !attemptLock(ctx, lockContext, filename, csiNodeLockTimeout) {
149+
return nil, status.Error(codes.Aborted, "request waited too long for the lock on tracking file")
150+
}
136151
err := jsonRW.ReadJSONFile(ctx, &volumeTrackingInfo, path.Join(v.volumeTrackingInfoPath, filename),
137152
"volume tracking info")
138153
if err != nil {

mocks/mock_utils/mock_limiter/mock_limiter.go

Lines changed: 66 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

utils/limiter/limiter.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package limiter
2+
3+
//go:generate mockgen -destination=../../mocks/mock_utils/mock_limiter/mock_limiter.go -package=mock_limiter github.com/netapp/trident/utils/limiter Limiter
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
. "github.com/netapp/trident/logging"
10+
)
11+
12+
type Limiter interface {
13+
Wait(ctx context.Context) error
14+
Release(ctx context.Context)
15+
}
16+
17+
// LimiterOption Using an option pattern to generalize New function.
18+
type LimiterOption func(limiter Limiter) error
19+
20+
type LimiterType uint16
21+
22+
const (
23+
TypeSemaphoreN LimiterType = iota
24+
)
25+
26+
// New creates a limiter based on the provided limiterType.
27+
// Note: This function is not thread-safe, so precautions are necessary for concurrent use.
28+
// Ensure the limiter type and options are correctly matched to avoid errors.
29+
func New(ctx context.Context, limID string, limiterType LimiterType, options ...LimiterOption) (Limiter, error) {
30+
logFields := LogFields{
31+
"LimiterID": limID,
32+
"LimiterType": limiterType,
33+
"Options": options,
34+
}
35+
Logc(ctx).WithFields(logFields).Debug(">>>> Limiter.New")
36+
defer Logc(ctx).WithFields(logFields).Debug("<<<< Limiter.New")
37+
38+
var limiter Limiter
39+
40+
// We'll create the new Limiter, depending upon the type passed in limiterType arg.
41+
switch limiterType {
42+
case TypeSemaphoreN:
43+
limiter = newSemaphoreN(limID)
44+
45+
default:
46+
return nil, fmt.Errorf("unknown limiter type: %T", limiterType)
47+
}
48+
49+
// Applying the limiter options.
50+
for _, option := range options {
51+
if err := option(limiter); err != nil {
52+
return nil, err
53+
}
54+
}
55+
56+
Logc(ctx).WithFields(logFields).Debugf("Limiter with limID %s was created", limID)
57+
58+
return limiter, nil
59+
}

0 commit comments

Comments
 (0)