Skip to content

Commit 308dcd7

Browse files
authored
Move utils/locks.go to /pkg
1 parent af840a4 commit 308dcd7

File tree

6 files changed

+48
-47
lines changed

6 files changed

+48
-47
lines changed

frontend/csi/node_server.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
. "github.com/netapp/trident/logging"
2929
"github.com/netapp/trident/pkg/collection"
3030
"github.com/netapp/trident/pkg/convert"
31+
"github.com/netapp/trident/pkg/locks"
3132
sa "github.com/netapp/trident/storage_attribute"
3233
"github.com/netapp/trident/utils"
3334
"github.com/netapp/trident/utils/devices"
@@ -105,7 +106,7 @@ const (
105106

106107
func attemptLock(ctx context.Context, lockContext, lockID string, lockTimeout time.Duration) bool {
107108
startTime := time.Now()
108-
utils.Lock(ctx, lockContext, lockID)
109+
locks.Lock(ctx, lockContext, lockID)
109110
// Fail if the gRPC call came in a long time ago to avoid kubelet 120s timeout
110111
if time.Since(startTime) > lockTimeout {
111112
Logc(ctx).Debugf("Request spent more than %v in the queue and timed out", csiNodeLockTimeout)
@@ -127,7 +128,7 @@ func (p *Plugin) NodeStageVolume(
127128
defer Logc(ctx).WithFields(fields).Debug("<<<< NodeStageVolume")
128129

129130
lockContext := "NodeStageVolume"
130-
defer utils.Unlock(ctx, lockContext, req.GetVolumeId())
131+
defer locks.Unlock(ctx, lockContext, req.GetVolumeId())
131132
if !attemptLock(ctx, lockContext, req.GetVolumeId(), csiNodeLockTimeout) {
132133
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
133134
}
@@ -181,7 +182,7 @@ func (p *Plugin) nodeUnstageVolume(
181182
defer Logc(ctx).WithFields(fields).Debug("<<<< NodeUnstageVolume")
182183

183184
lockContext := "NodeUnstageVolume"
184-
defer utils.Unlock(ctx, lockContext, req.GetVolumeId())
185+
defer locks.Unlock(ctx, lockContext, req.GetVolumeId())
185186
if !attemptLock(ctx, lockContext, req.GetVolumeId(), csiNodeLockTimeout) {
186187
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
187188
}
@@ -269,7 +270,7 @@ func (p *Plugin) NodePublishVolume(
269270
defer Logc(ctx).WithFields(fields).Debug("<<<< NodePublishVolume")
270271

271272
lockContext := "NodePublishVolume"
272-
defer utils.Unlock(ctx, lockContext, req.GetVolumeId())
273+
defer locks.Unlock(ctx, lockContext, req.GetVolumeId())
273274
if !attemptLock(ctx, lockContext, req.GetVolumeId(), csiNodeLockTimeout) {
274275
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
275276
}
@@ -326,7 +327,7 @@ func (p *Plugin) NodeUnpublishVolume(
326327
defer Logc(ctx).WithFields(fields).Debug("<<<< NodeUnpublishVolume")
327328

328329
lockContext := "NodeUnpublishVolume"
329-
defer utils.Unlock(ctx, lockContext, req.GetVolumeId())
330+
defer locks.Unlock(ctx, lockContext, req.GetVolumeId())
330331
if !attemptLock(ctx, lockContext, req.GetVolumeId(), csiNodeLockTimeout) {
331332
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
332333
}
@@ -1553,7 +1554,7 @@ func (p *Plugin) nodeUnstageFCPVolumeRetry(
15531554
) (*csi.NodeUnstageVolumeResponse, error) {
15541555
// Serializing all the parallel requests by relying on the constant var.
15551556
lockContext := "NodeUnstageFCPVolume-" + req.GetVolumeId()
1556-
defer utils.Unlock(ctx, lockContext, nodeLockID)
1557+
defer locks.Unlock(ctx, lockContext, nodeLockID)
15571558

15581559
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
15591560
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -1588,7 +1589,7 @@ func (p *Plugin) nodePublishFCPVolume(
15881589
) (*csi.NodePublishVolumeResponse, error) {
15891590
// Serializing all the parallel requests by relying on the constant var.
15901591
lockContext := "NodePublishFCPVolume-" + req.GetVolumeId()
1591-
defer utils.Unlock(ctx, lockContext, nodeLockID)
1592+
defer locks.Unlock(ctx, lockContext, nodeLockID)
15921593

15931594
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
15941595
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -2090,7 +2091,7 @@ func (p *Plugin) nodeUnstageISCSIVolumeRetry(
20902091
) (*csi.NodeUnstageVolumeResponse, error) {
20912092
// Serializing all the parallel requests by relying on the constant var.
20922093
lockContext := "NodeUnstageISCSIVolume-" + req.GetVolumeId()
2093-
defer utils.Unlock(ctx, lockContext, nodeLockID)
2094+
defer locks.Unlock(ctx, lockContext, nodeLockID)
20942095

20952096
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
20962097
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -2125,7 +2126,7 @@ func (p *Plugin) nodePublishISCSIVolume(
21252126
) (*csi.NodePublishVolumeResponse, error) {
21262127
// Serializing all the parallel requests by relying on the constant var.
21272128
lockContext := "NodePublishISCSIVolume-" + req.GetVolumeId()
2128-
defer utils.Unlock(ctx, lockContext, nodeLockID)
2129+
defer locks.Unlock(ctx, lockContext, nodeLockID)
21292130

21302131
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
21312132
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -2655,8 +2656,8 @@ func (p *Plugin) updateCHAPInfoForSessions(
26552656
// performISCSISelfHealing inspects the desired state of the iSCSI sessions with the current state and accordingly
26562657
// identifies candidate sessions that require remediation. This function is invoked periodically.
26572658
func (p *Plugin) performISCSISelfHealing(ctx context.Context) {
2658-
utils.Lock(ctx, iSCSISelfHealingLockContext, nodeLockID)
2659-
defer utils.Unlock(ctx, iSCSISelfHealingLockContext, nodeLockID)
2659+
locks.Lock(ctx, iSCSISelfHealingLockContext, nodeLockID)
2660+
defer locks.Unlock(ctx, iSCSISelfHealingLockContext, nodeLockID)
26602661

26612662
defer func() {
26622663
if r := recover(); r != nil {
@@ -2742,7 +2743,7 @@ func (p *Plugin) fixISCSISessions(ctx context.Context, portals []string, portalT
27422743

27432744
// Check if there is a need to stop the loop from running
27442745
// NOTE: The loop should run at least once for all portal types.
2745-
if idx > 0 && utils.WaitQueueSize(nodeLockID) > 0 {
2746+
if idx > 0 && locks.WaitQueueSize(nodeLockID) > 0 {
27462747
// Check to see if some other operation(s) requires node lock, if not then continue to resolve
27472748
// non-stale iSCSI portal issues else break out of this loop.
27482749
if isNonStaleSessionFix {
@@ -2913,7 +2914,7 @@ func (p *Plugin) nodeUnstageNVMeVolume(
29132914
) (*csi.NodeUnstageVolumeResponse, error) {
29142915
// Serializing all the parallel requests by relying on the constant var.
29152916
lockContext := "NodeUnstageNVMeVolume-" + req.GetVolumeId()
2916-
defer utils.Unlock(ctx, lockContext, nodeLockID)
2917+
defer locks.Unlock(ctx, lockContext, nodeLockID)
29172918

29182919
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
29192920
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -3057,7 +3058,7 @@ func (p *Plugin) nodePublishNVMeVolume(
30573058
) (*csi.NodePublishVolumeResponse, error) {
30583059
// Serializing all the parallel requests by relying on the constant var.
30593060
lockContext := "NodePublishNVMeVolume-" + req.GetVolumeId()
3060-
defer utils.Unlock(ctx, lockContext, nodeLockID)
3061+
defer locks.Unlock(ctx, lockContext, nodeLockID)
30613062

30623063
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
30633064
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -3128,7 +3129,7 @@ func (p *Plugin) nodeStageSANVolume(
31283129
) (*csi.NodeStageVolumeResponse, error) {
31293130
// Serializing all the parallel requests by relying on the constant var.
31303131
lockContext := "NodeStageSanVolume-" + req.GetVolumeId()
3131-
defer utils.Unlock(ctx, lockContext, nodeLockID)
3132+
defer locks.Unlock(ctx, lockContext, nodeLockID)
31323133

31333134
if !attemptLock(ctx, lockContext, nodeLockID, csiNodeLockTimeout) {
31343135
return nil, status.Error(codes.Aborted, "request waited too long for the lock")
@@ -3190,8 +3191,8 @@ func (p *Plugin) nodeStageSANVolume(
31903191
// performNVMeSelfHealing inspects the desired state of the NVMe sessions with the current state and accordingly
31913192
// identifies candidate sessions that require remediation. This function is invoked periodically.
31923193
func (p *Plugin) performNVMeSelfHealing(ctx context.Context) {
3193-
utils.Lock(ctx, nvmeSelfHealingLockContext, nodeLockID)
3194-
defer utils.Unlock(ctx, nvmeSelfHealingLockContext, nodeLockID)
3194+
locks.Lock(ctx, nvmeSelfHealingLockContext, nodeLockID)
3195+
defer locks.Unlock(ctx, nvmeSelfHealingLockContext, nodeLockID)
31953196

31963197
defer func() {
31973198
if r := recover(); r != nil {
@@ -3240,7 +3241,7 @@ func (p *Plugin) fixNVMeSessions(ctx context.Context, stopAt time.Time, subsyste
32403241
// 1. We should fix at least one subsystem in a single self-healing thread.
32413242
// 2. If there's another thread waiting for the node lock and if we have exceeded our 60 secs lock, we should
32423243
// stop NVMe self-healing.
3243-
if index > 0 && utils.WaitQueueSize(nodeLockID) > 0 && time.Now().After(stopAt) {
3244+
if index > 0 && locks.WaitQueueSize(nodeLockID) > 0 && time.Now().After(stopAt) {
32443245
Logc(ctx).Info("Self-healing has exceeded maximum runtime; preempting NVMe session self-healing.")
32453246
break
32463247
}

frontend/csi/node_server_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import (
3434
mock_nvme "github.com/netapp/trident/mocks/mock_utils/nvme"
3535
"github.com/netapp/trident/pkg/collection"
3636
"github.com/netapp/trident/pkg/convert"
37+
"github.com/netapp/trident/pkg/locks"
3738
sa "github.com/netapp/trident/storage_attribute"
38-
"github.com/netapp/trident/utils"
3939
"github.com/netapp/trident/utils/devices"
4040
"github.com/netapp/trident/utils/errors"
4141
"github.com/netapp/trident/utils/filesystem"
@@ -1263,9 +1263,9 @@ func TestFixISCSISessions(t *testing.T) {
12631263
portals := getPortals(input.PublishedPortals, input.PortalActions)
12641264

12651265
if input.AddNewNodeOps {
1266-
go utils.Lock(ctx, "test-lock1", nodeLockID)
1266+
go locks.Lock(ctx, "test-lock1", nodeLockID)
12671267
snooze(10)
1268-
go utils.Lock(ctx, "test-lock2", nodeLockID)
1268+
go locks.Lock(ctx, "test-lock2", nodeLockID)
12691269
snooze(10)
12701270
}
12711271

@@ -1290,16 +1290,16 @@ func TestFixISCSISessions(t *testing.T) {
12901290
}
12911291

12921292
if input.AddNewNodeOps {
1293-
utils.Unlock(ctx, "test-lock1", nodeLockID)
1293+
locks.Unlock(ctx, "test-lock1", nodeLockID)
12941294

12951295
// Wait for the lock to be released
1296-
for utils.WaitQueueSize(nodeLockID) > 1 {
1296+
for locks.WaitQueueSize(nodeLockID) > 1 {
12971297
snooze(10)
12981298
}
12991299

13001300
// Give some time for another context to acquire the lock
13011301
snooze(100)
1302-
utils.Unlock(ctx, "test-lock2", nodeLockID)
1302+
locks.Unlock(ctx, "test-lock2", nodeLockID)
13031303
}
13041304
})
13051305
}
@@ -1825,13 +1825,13 @@ func TestAttemptLock_Failure(t *testing.T) {
18251825
expected := attemptLock(ctx, lockContext, nodeLockID, lockTimeout)
18261826

18271827
assert.False(t, expected)
1828-
utils.Unlock(ctx, lockContext, nodeLockID)
1828+
locks.Unlock(ctx, lockContext, nodeLockID)
18291829
}()
18301830
// first request goes to sleep holding the lock
18311831
if expected {
18321832
time.Sleep(500 * time.Millisecond)
18331833
}
1834-
utils.Unlock(ctx, lockContext, nodeLockID)
1834+
locks.Unlock(ctx, lockContext, nodeLockID)
18351835
wg.Wait()
18361836
}
18371837

@@ -1860,13 +1860,13 @@ func TestAttemptLock_Success(t *testing.T) {
18601860
expected := attemptLock(ctx, lockContext, nodeLockID, lockTimeout)
18611861

18621862
assert.True(t, expected)
1863-
utils.Unlock(ctx, lockContext, nodeLockID)
1863+
locks.Unlock(ctx, lockContext, nodeLockID)
18641864
}()
18651865
// first request goes to sleep holding the lock
18661866
if expected {
18671867
time.Sleep(200 * time.Millisecond)
18681868
}
1869-
utils.Unlock(ctx, lockContext, nodeLockID)
1869+
locks.Unlock(ctx, lockContext, nodeLockID)
18701870
wg.Wait()
18711871
}
18721872

frontend/csi/volume_publish_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"github.com/netapp/trident/config"
1919
"github.com/netapp/trident/internal/fiji"
2020
. "github.com/netapp/trident/logging"
21-
"github.com/netapp/trident/utils"
21+
"github.com/netapp/trident/pkg/locks"
2222
"github.com/netapp/trident/utils/errors"
2323
"github.com/netapp/trident/utils/filesystem"
2424
"github.com/netapp/trident/utils/models"
@@ -100,7 +100,7 @@ func (v *VolumePublishManager) WriteTrackingInfo(
100100

101101
// we can have locks on filename itself.
102102
lockContext := "WriteTrackingInfo"
103-
defer utils.Unlock(ctx, lockContext, filename)
103+
defer locks.Unlock(ctx, lockContext, filename)
104104
if !attemptLock(ctx, lockContext, filename, csiNodeLockTimeout) {
105105
return status.Error(codes.Aborted, "request waited too long for the lock")
106106
}
@@ -144,7 +144,7 @@ func (v *VolumePublishManager) readTrackingInfo(
144144
}
145145

146146
lockContext := "ReadTrackingInfo"
147-
defer utils.Unlock(ctx, lockContext, filename)
147+
defer locks.Unlock(ctx, lockContext, filename)
148148
if !attemptLock(ctx, lockContext, filename, csiNodeLockTimeout) {
149149
return nil, status.Error(codes.Aborted, "request waited too long for the lock on tracking file")
150150
}

utils/locks.go renamed to pkg/locks/locks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
// Copyright 2022 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
22

3-
package utils
3+
package locks
44

55
import (
66
"context"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2018 NetApp, Inc. All Rights Reserved.
22

3-
package utils
3+
package locks
44

55
import (
66
"context"

storage_drivers/ontap/ontap_nas_qtree.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import (
2323
"github.com/netapp/trident/pkg/capacity"
2424
"github.com/netapp/trident/pkg/collection"
2525
"github.com/netapp/trident/pkg/convert"
26+
"github.com/netapp/trident/pkg/locks"
2627
"github.com/netapp/trident/storage"
2728
sa "github.com/netapp/trident/storage_attribute"
2829
drivers "github.com/netapp/trident/storage_drivers"
2930
"github.com/netapp/trident/storage_drivers/ontap/api"
3031
"github.com/netapp/trident/storage_drivers/ontap/awsapi"
31-
"github.com/netapp/trident/utils"
3232
"github.com/netapp/trident/utils/errors"
3333
"github.com/netapp/trident/utils/models"
3434
)
@@ -326,8 +326,8 @@ func (d *NASQtreeStorageDriver) Create(
326326
defer Logd(ctx, d.Name(), d.Config.DebugTraceFlags["method"]).WithFields(fields).Trace("<<<< Create")
327327

328328
// Ensure any Flexvol we create won't be pruned before we place a qtree on it
329-
utils.Lock(ctx, "create", d.sharedLockID)
330-
defer utils.Unlock(ctx, "create", d.sharedLockID)
329+
locks.Lock(ctx, "create", d.sharedLockID)
330+
defer locks.Unlock(ctx, "create", d.sharedLockID)
331331

332332
// Generic user-facing message
333333
createError := errors.New("volume creation failed")
@@ -578,8 +578,8 @@ func (d *NASQtreeStorageDriver) Destroy(ctx context.Context, volConfig *storage.
578578
defer Logd(ctx, d.Name(), d.Config.DebugTraceFlags["method"]).WithFields(fields).Trace("<<<< Destroy")
579579

580580
// Ensure the deleted qtree reaping job doesn't interfere with this workflow
581-
utils.Lock(ctx, "destroy", d.sharedLockID)
582-
defer utils.Unlock(ctx, "destroy", d.sharedLockID)
581+
locks.Lock(ctx, "destroy", d.sharedLockID)
582+
defer locks.Unlock(ctx, "destroy", d.sharedLockID)
583583

584584
// Generic user-facing message
585585
deleteError := errors.New("volume deletion failed")
@@ -1603,8 +1603,8 @@ func (d *NASQtreeStorageDriver) queueAllFlexvolsForQuotaResize(ctx context.Conte
16031603
// operation will be attempted each time this method is called until it succeeds.
16041604
func (d *NASQtreeStorageDriver) resizeQuotas(ctx context.Context) {
16051605
// Ensure we don't forget any Flexvol that is involved in a qtree provisioning workflow
1606-
utils.Lock(ctx, "resize", d.sharedLockID)
1607-
defer utils.Unlock(ctx, "resize", d.sharedLockID)
1606+
locks.Lock(ctx, "resize", d.sharedLockID)
1607+
defer locks.Unlock(ctx, "resize", d.sharedLockID)
16081608

16091609
Logc(ctx).Debug("Housekeeping, resizing quotas.")
16101610

@@ -1649,8 +1649,8 @@ func (d *NASQtreeStorageDriver) getTotalHardDiskLimitQuota(ctx context.Context,
16491649
// hardcoded prefix on their names) that have no qtrees are deleted.
16501650
func (d *NASQtreeStorageDriver) pruneUnusedFlexvols(ctx context.Context) {
16511651
// Ensure we don't prune any Flexvol that is involved in a qtree provisioning workflow
1652-
utils.Lock(ctx, "prune", d.sharedLockID)
1653-
defer utils.Unlock(ctx, "prune", d.sharedLockID)
1652+
locks.Lock(ctx, "prune", d.sharedLockID)
1653+
defer locks.Unlock(ctx, "prune", d.sharedLockID)
16541654

16551655
Logc(ctx).Debug("Housekeeping, checking for managed Flexvols with no qtrees.")
16561656

@@ -1729,8 +1729,8 @@ func (d *NASQtreeStorageDriver) pruneUnusedFlexvols(ctx context.Context) {
17291729
// destroy call failed or was never made due to a process interruption.
17301730
func (d *NASQtreeStorageDriver) reapDeletedQtrees(ctx context.Context) {
17311731
// Ensure we don't reap any qtree that is involved in a qtree delete workflow
1732-
utils.Lock(ctx, "reap", d.sharedLockID)
1733-
defer utils.Unlock(ctx, "reap", d.sharedLockID)
1732+
locks.Lock(ctx, "reap", d.sharedLockID)
1733+
defer locks.Unlock(ctx, "reap", d.sharedLockID)
17341734

17351735
Logc(ctx).Debug("Housekeeping, checking for deleted qtrees.")
17361736

@@ -2288,8 +2288,8 @@ func (d *NASQtreeStorageDriver) Resize(ctx context.Context, volConfig *storage.V
22882288
}
22892289

22902290
// Ensure any Flexvol won't be pruned before resize is completed.
2291-
utils.Lock(ctx, "resize", d.sharedLockID)
2292-
defer utils.Unlock(ctx, "resize", d.sharedLockID)
2291+
locks.Lock(ctx, "resize", d.sharedLockID)
2292+
defer locks.Unlock(ctx, "resize", d.sharedLockID)
22932293

22942294
// Generic user-facing message
22952295
resizeError := errors.New("storage driver failed to resize the volume")

0 commit comments

Comments
 (0)