Skip to content

Commit 014da68

Browse files
committed
Remove SyncPending flag from flock, since it's only used in file_cache.
Remove file_cache reference from policy, and pass pendingOps through conf instead.
1 parent 8daa843 commit 014da68

File tree

7 files changed

+63
-58
lines changed

7 files changed

+63
-58
lines changed

common/lock_map.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ type LockMapItem struct {
3636
mtx sync.RWMutex
3737
downloadTime time.Time
3838
// track if file is in lazy open state
39-
LazyOpen bool
40-
SyncPending bool
39+
LazyOpen bool
4140
}
4241

4342
// Map holding locks for all the files

component/file_cache/async.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ func (fc *FileCache) startScheduler() {
186186
func (fc *FileCache) addPendingOp(name string, flock *common.LockMapItem) {
187187
log.Trace("FileCache::addPendingOp : %s", name)
188188
fc.pendingOps.Store(name, struct{}{})
189-
flock.SyncPending = true
190189
select {
191190
case fc.pendingOpAdded <- struct{}{}:
192191
default: // do not block
@@ -262,7 +261,8 @@ func (fc *FileCache) uploadPendingFile(name string) error {
262261
defer flock.Unlock()
263262

264263
// don't double upload
265-
if !flock.SyncPending {
264+
_, stillPending := fc.pendingOps.Load(name)
265+
if !stillPending {
266266
return nil
267267
}
268268

@@ -308,13 +308,17 @@ func (fc *FileCache) uploadPendingFile(name string) error {
308308
}
309309
}
310310
// update state
311-
flock.SyncPending = false
312311
log.Info("FileCache::uploadPendingFile : File uploaded: %s", name)
313312
fc.pendingOps.Delete(name)
314313

315314
return nil
316315
}
317316

317+
func (fc *FileCache) IsScheduled(objName string) bool {
318+
_, inSchedule := fc.pendingOps.Load(objName)
319+
return inSchedule
320+
}
321+
318322
// this returns true when offline access is enabled, and it's safe to access this object offline
319323
func (fc *FileCache) offlineOperationAllowed(name string) bool {
320324
return fc.offlineAccess && fc.notInCloud(name)

component/file_cache/cache_policy.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package file_cache
2828
import (
2929
"fmt"
3030
"os"
31+
"sync"
3132

3233
"github.com/Seagate/cloudfuse/common"
3334
"github.com/Seagate/cloudfuse/common/log"
@@ -45,7 +46,8 @@ type cachePolicyConfig struct {
4546
highThreshold float64
4647
lowThreshold float64
4748

48-
fileLocks *common.LockMap // uses object name (common.JoinUnixFilepath)
49+
fileLocks *common.LockMap // uses object name (common.JoinUnixFilepath)
50+
pendingOps *sync.Map
4951

5052
policyTrace bool
5153
}

component/file_cache/file_cache.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ func (fc *FileCache) GetPolicyConfig(conf FileCacheOptions) cachePolicyConfig {
453453
maxSizeMB: conf.MaxSizeMB,
454454
fileLocks: fc.fileLocks,
455455
policyTrace: conf.EnablePolicyTrace,
456+
pendingOps: &fc.pendingOps,
456457
}
457458

458459
return cacheConfig
@@ -1191,7 +1192,6 @@ func (fc *FileCache) DeleteFile(options internal.DeleteFileOptions) error {
11911192

11921193
// update file state
11931194
flock.LazyOpen = false
1194-
flock.SyncPending = false
11951195
// remove deleted file from async upload map
11961196
fc.pendingOps.Delete(options.Name)
11971197

@@ -2118,7 +2118,6 @@ func (fc *FileCache) renamePendingOp(srcName, dstName string, dflock *common.Loc
21182118
_, operationPending := fc.pendingOps.LoadAndDelete(srcName)
21192119
if operationPending {
21202120
fc.pendingOps.Store(dstName, struct{}{})
2121-
dflock.SyncPending = true
21222121
}
21232122
}
21242123

@@ -2144,7 +2143,6 @@ func (fc *FileCache) renameOpenHandles(
21442143
}
21452144
// copy flags
21462145
dflock.LazyOpen = sflock.LazyOpen
2147-
dflock.SyncPending = sflock.SyncPending
21482146
}
21492147
}
21502148

component/file_cache/file_cache_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,10 +1561,6 @@ loopbackfs:
15611561
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, file))
15621562
_, exists = suite.fileCache.pendingOps.Load(file)
15631563
suite.assert.False(exists, "File should have been removed from pendingOps after upload")
1564-
suite.assert.False(
1565-
suite.fileCache.fileLocks.Get(file).SyncPending,
1566-
"SyncPending flag should be cleared after upload",
1567-
)
15681564
}
15691565

15701566
func (suite *fileCacheTestSuite) TestCronOnToOFFUpload() {
@@ -1630,8 +1626,6 @@ loopbackfs:
16301626
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, file2))
16311627
_, scheduled := suite.fileCache.pendingOps.Load(file2)
16321628
suite.assert.True(scheduled, "File should be scheduled when scheduler is OFF")
1633-
flock := suite.fileCache.fileLocks.Get(file2)
1634-
suite.assert.True(flock.SyncPending, "SyncPending flag should be set")
16351629
}
16361630

16371631
func (suite *fileCacheTestSuite) TestNoScheduleAlwaysOn() {
@@ -1667,11 +1661,6 @@ loopbackfs:
16671661
uploadedData, err := os.ReadFile(filepath.Join(suite.fake_storage_path, file))
16681662
suite.assert.NoError(err)
16691663
suite.assert.Equal(data, uploadedData, "Uploaded file content should match original")
1670-
1671-
flock := suite.fileCache.fileLocks.Get(file)
1672-
if flock != nil {
1673-
suite.assert.False(flock.SyncPending, "SyncPending flag should be clear")
1674-
}
16751664
}
16761665

16771666
func (suite *fileCacheTestSuite) TestExistingCloudFileImmediateUpload() {
@@ -1799,12 +1788,6 @@ loopbackfs:
17991788

18001789
_, existsInScheduleNew := suite.fileCache.pendingOps.Load(dstFile)
18011790
suite.assert.True(existsInScheduleNew, "New file name should be in pendingOps after rename")
1802-
1803-
// Check that file lock status was properly transferred
1804-
flock := suite.fileCache.fileLocks.Get(dstFile)
1805-
if flock != nil {
1806-
suite.assert.True(flock.SyncPending, "SyncPending flag should be set on renamed file")
1807-
}
18081791
}
18091792

18101793
func (suite *fileCacheTestSuite) TestDeleteFileAndPendingOps() {

component/file_cache/lru_policy.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,13 @@ type lruPolicy struct {
7373

7474
// DU utility was found on the path or not
7575
duPresent bool
76-
77-
// Tracks scheduled files to skip during eviction
78-
schedule *FileCache
7976
}
8077

8178
// LRUPolicySnapshot represents the *persisted state* of lruPolicy.
8279
// It contains only the fields that need to be saved, and they are exported.
8380
type LRUPolicySnapshot struct {
8481
NodeList []string // Just node names, *without their fc.tmp prefix*, in linked list order
85-
SyncPendingFlags []bool // the SyncPending flag for each node, in the same order as NodeList
82+
SyncPendingFlags []bool // whether each file in NodeList belongs in the pendingOps map
8683
CurrMarkerPosition uint64 // Node index of currMarker
8784
LastMarkerPosition uint64 // Node index of lastMarker
8885
}
@@ -161,11 +158,6 @@ func (p *lruPolicy) ShutdownPolicy() error {
161158
return p.createSnapshot().writeToFile(p.tmpPath)
162159
}
163160

164-
func (fc *FileCache) IsScheduled(objName string) bool {
165-
_, inSchedule := fc.pendingOps.Load(objName)
166-
return inSchedule
167-
}
168-
169161
func (p *lruPolicy) createSnapshot() *LRUPolicySnapshot {
170162
log.Trace("lruPolicy::saveSnapshot")
171163
var snapshot LRUPolicySnapshot
@@ -183,11 +175,10 @@ func (p *lruPolicy) createSnapshot() *LRUPolicySnapshot {
183175
snapshot.LastMarkerPosition = index
184176
case strings.HasPrefix(current.name, p.tmpPath):
185177
relName := current.name[len(p.tmpPath):]
186-
objName := common.NormalizeObjectName(relName[1:])
187178
snapshot.NodeList = append(snapshot.NodeList, relName)
188-
flock := p.fileLocks.Get(objName)
189-
// don't acquire the locks - worst case scenario, we resync something unnecessarily
190-
snapshot.SyncPendingFlags = append(snapshot.SyncPendingFlags, flock.SyncPending)
179+
objName := common.NormalizeObjectName(relName[1:])
180+
_, isPending := p.pendingOps.Load(objName)
181+
snapshot.SyncPendingFlags = append(snapshot.SyncPendingFlags, isPending)
191182
default:
192183
log.Err("lruPolicy::saveSnapshot : %s Ignoring unrecognized cache path", current.name)
193184
}
@@ -201,7 +192,7 @@ func (p *lruPolicy) loadSnapshot(snapshot *LRUPolicySnapshot) {
201192
return
202193
}
203194
// maintain backward compatibility
204-
loadSyncPending := len(snapshot.NodeList) == len(snapshot.SyncPendingFlags)
195+
loadPendingOps := len(snapshot.NodeList) == len(snapshot.SyncPendingFlags)
205196
p.Lock()
206197
defer p.Unlock()
207198
// walk the slice and write the entries into the policy
@@ -210,11 +201,10 @@ func (p *lruPolicy) loadSnapshot(snapshot *LRUPolicySnapshot) {
210201
nextNode := p.head
211202
tail := p.lastMarker
212203
for i, v := range snapshot.NodeList {
213-
// restore SyncPending flag
214-
if loadSyncPending && snapshot.SyncPendingFlags[i] {
204+
// populate pendingOps
205+
if loadPendingOps && snapshot.SyncPendingFlags[i] {
215206
objName := v[1:]
216-
flock := p.fileLocks.Get(objName)
217-
flock.SyncPending = true
207+
p.pendingOps.Store(objName, struct{}{})
218208
}
219209
// recreate the node
220210
fullPath := filepath.Join(p.tmpPath, v)
@@ -524,7 +514,7 @@ func (p *lruPolicy) deleteExpiredNodes() {
524514
if objName[0] == '/' {
525515
objName = objName[1:]
526516
}
527-
if p.schedule != nil && p.schedule.IsScheduled(objName) {
517+
if _, syncPending := p.pendingOps.Load(objName); syncPending {
528518
continue
529519
}
530520

@@ -594,7 +584,7 @@ func (p *lruPolicy) deleteItem(name string) {
594584
}
595585

596586
// check if the file is pending upload (it was modified offline)
597-
if flock.SyncPending {
587+
if _, syncPending := p.pendingOps.Load(objName); syncPending {
598588
log.Warn("lruPolicy::DeleteItem : %s File is not synchronized to cloud storage", name)
599589
p.CacheValid(name)
600590
return

component/file_cache/lru_policy_test.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ func (suite *lruPolicyTestSuite) verifyPolicy(expectedPolicy, actualPolicy *lruP
315315
suite.assert.Same(actualPolicy.lastMarker, actual)
316316
default:
317317
suite.assert.Equal(expected.name, actual.name)
318+
objName := expected.name[len(suite.policy.tmpPath)+1:]
319+
_, expectedPending := expectedPolicy.pendingOps.Load(objName)
320+
_, actualPending := actualPolicy.pendingOps.Load(objName)
321+
suite.assert.Equal(expectedPending, actualPending)
318322
}
319323
suite.assert.NotNil(actual, "actual list is shorter than expected")
320324
suite.assert.NotNil(expected, "actual list is longer than expected")
@@ -337,6 +341,32 @@ func (suite *lruPolicyTestSuite) TestCreateSnapshotEmpty() {
337341
suite.verifyPolicy(originalPolicy, suite.policy)
338342
}
339343

344+
func (suite *lruPolicyTestSuite) TestCreateSnapshot() {
345+
defer suite.cleanupTest()
346+
// setup
347+
numFiles := 5
348+
pathPrefix := filepath.Join(cache_path, "temp")
349+
for i := 1; i <= numFiles; i++ {
350+
suite.policy.CacheValid(pathPrefix + fmt.Sprint(i))
351+
if i > 3 {
352+
suite.policy.pendingOps.Store("temp"+fmt.Sprint(i), struct{}{})
353+
}
354+
}
355+
originalPolicy := suite.policy
356+
// test
357+
snapshot := suite.policy.createSnapshot()
358+
suite.cleanupTest()
359+
suite.setupTestHelper(originalPolicy.cachePolicyConfig)
360+
suite.policy.loadSnapshot(snapshot)
361+
// assert
362+
suite.assert.NotNil(snapshot)
363+
suite.assert.Len(snapshot.NodeList, numFiles)
364+
for i, v := range snapshot.NodeList {
365+
suite.assert.Equal(pathPrefix+fmt.Sprint(numFiles-i), filepath.Join(cache_path, v))
366+
}
367+
suite.verifyPolicy(originalPolicy, suite.policy)
368+
}
369+
340370
func (suite *lruPolicyTestSuite) TestCreateSnapshotWithTrailingMarkers() {
341371
defer suite.cleanupTest()
342372
// setup
@@ -481,6 +511,7 @@ func (suite *lruPolicyTestSuite) TestSnapshotSerialization() {
481511
NodeList: []string{"a", "b", "c"},
482512
CurrMarkerPosition: 1,
483513
LastMarkerPosition: 2,
514+
SyncPendingFlags: []bool{true, false, false},
484515
}
485516
// test
486517
err := snapshot.writeToFile(cache_path)
@@ -494,12 +525,11 @@ func (suite *lruPolicyTestSuite) TestSnapshotSerialization() {
494525
func (suite *lruPolicyTestSuite) TestNoEvictionIfInPendingOps() {
495526
defer suite.cleanupTest()
496527

497-
fileName := filepath.Join(cache_path, "scheduled_file")
528+
name := "pending_file"
529+
fileName := filepath.Join(cache_path, name)
498530
suite.policy.CacheValid(fileName)
499531

500-
fakeSchedule := &FileCache{}
501-
fakeSchedule.pendingOps.Store(common.NormalizeObjectName("scheduled_file"), struct{}{})
502-
suite.policy.schedule = fakeSchedule
532+
suite.policy.pendingOps.Store(name, struct{}{})
503533

504534
time.Sleep(2 * time.Second)
505535

@@ -509,20 +539,19 @@ func (suite *lruPolicyTestSuite) TestNoEvictionIfInPendingOps() {
509539
func (suite *lruPolicyTestSuite) TestEvictionRespectsPendingOps() {
510540
defer suite.cleanupTest()
511541

542+
objNames := []string{"File1", "file2", "file3", "file4"}
512543
fileNames := []string{
513-
filepath.Join(cache_path, "file1"),
514-
filepath.Join(cache_path, "file2"),
515-
filepath.Join(cache_path, "file3"),
516-
filepath.Join(cache_path, "file4"),
544+
filepath.Join(cache_path, objNames[0]),
545+
filepath.Join(cache_path, objNames[1]),
546+
filepath.Join(cache_path, objNames[2]),
547+
filepath.Join(cache_path, objNames[3]),
517548
}
518549
for _, name := range fileNames {
519550
suite.policy.CacheValid(name)
520551
}
521552

522-
fakeSchedule := &FileCache{}
523-
fakeSchedule.pendingOps.Store(common.NormalizeObjectName("file2"), struct{}{})
524-
fakeSchedule.pendingOps.Store(common.NormalizeObjectName("file4"), struct{}{})
525-
suite.policy.schedule = fakeSchedule
553+
suite.policy.pendingOps.Store(objNames[1], struct{}{})
554+
suite.policy.pendingOps.Store(objNames[3], struct{}{})
526555

527556
time.Sleep(3 * time.Second)
528557

0 commit comments

Comments
 (0)