From 898b2662c517ae344b9fb1a20fa7f855f77af0d5 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 14 Oct 2024 11:49:10 -0600 Subject: [PATCH 001/136] Remove TODO - race condition is already handled --- component/file_cache/file_cache.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 95179eab3..376818bbe 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -390,7 +390,6 @@ func (fc *FileCache) invalidateDirectory(name string) { log.Trace("FileCache::invalidateDirectory : %s", name) localPath := filepath.Join(fc.tmpPath, name) - // TODO : wouldn't this cause a race condition? a thread might get the lock before we purge - and the file would be non-existent // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' var directoriesToPurge []string err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { From 31c12e0bf6e00335c5677751cdcb8b3e8599dd2b Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 24 Oct 2024 17:33:19 -0600 Subject: [PATCH 002/136] Add CloudUnreachableError from async-cloud branch --- common/types.go | 23 +++++++++++++++++++++++ component/s3storage/utils.go | 7 +++++++ 2 files changed, 30 insertions(+) diff --git a/common/types.go b/common/types.go index fe4ba2e81..5a89aa9d9 100644 --- a/common/types.go +++ b/common/types.go @@ -84,6 +84,29 @@ func CloudfuseVersion_() string { return cloudfuseVersion_ } +// custom error shared by different components +type CloudUnreachableError struct { + Message string + CloudStorageError error +} + +func NewCloudUnreachableError(originalError error) CloudUnreachableError { + return CloudUnreachableError{ + Message: "Failed to connect to cloud storage", + CloudStorageError: originalError, + } +} +func (e CloudUnreachableError) Error() string { + return fmt.Sprintf("%s. Here's why: %v", e.Message, e.CloudStorageError) +} +func (e CloudUnreachableError) Unwrap() error { + return e.CloudStorageError +} +func (e CloudUnreachableError) Is(target error) bool { + _, ok := target.(*CloudUnreachableError) + return ok +} + var DefaultWorkDir string var DefaultLogFilePath string var StatsConfigFilePath string diff --git a/component/s3storage/utils.go b/component/s3storage/utils.go index cb64d67a3..51d315d1f 100644 --- a/component/s3storage/utils.go +++ b/component/s3storage/utils.go @@ -37,6 +37,7 @@ import ( "github.com/Seagate/cloudfuse/common/log" "github.com/Seagate/cloudfuse/internal" + "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/aws/smithy-go" ) @@ -130,6 +131,12 @@ func parseS3Err(err error, attemptedAction string) error { } } + var maxAttempts *retry.MaxAttemptsError + if errors.As(err, &maxAttempts) { + log.Err("%s : Failed to %s because cloud storage is unreachable", functionName, attemptedAction) + return common.NewCloudUnreachableError(err) + } + // unrecognized error - parsing failed // print and return the original error log.Err("%s : Failed to %s. Here's why: %v", functionName, attemptedAction, err) From 808abe7dfaf62729cb1e3e37b66a861d2bad0b4f Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 24 Oct 2024 17:36:05 -0600 Subject: [PATCH 003/136] Add config option to allow users to disable network fault tolerance ("block-offline-access") --- component/file_cache/file_cache.go | 11 +++++++---- setup/baseConfig.yaml | 1 + 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 376818bbe..160a645ae 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -64,6 +64,7 @@ type FileCache struct { mountPath string // uses os.Separator (filepath.Join) allowOther bool offloadIO bool + offlineAccess bool syncToFlush bool syncToDelete bool maxCacheSize float64 @@ -95,8 +96,9 @@ type FileCacheOptions struct { AllowNonEmpty bool `config:"allow-non-empty-temp" yaml:"allow-non-empty-temp,omitempty"` CleanupOnStart bool `config:"cleanup-on-start" yaml:"cleanup-on-start,omitempty"` - EnablePolicyTrace bool `config:"policy-trace" yaml:"policy-trace,omitempty"` - OffloadIO bool `config:"offload-io" yaml:"offload-io,omitempty"` + BlockOfflineAccess bool `config:"block-offline-access" yaml:"block-offline-access,omitempty"` + EnablePolicyTrace bool `config:"policy-trace" yaml:"policy-trace,omitempty"` + OffloadIO bool `config:"offload-io" yaml:"offload-io,omitempty"` // v1 support V1Timeout uint32 `config:"file-cache-timeout-in-seconds" yaml:"-"` @@ -234,6 +236,7 @@ func (c *FileCache) Configure(_ bool) error { c.cleanupOnStart = conf.CleanupOnStart c.policyTrace = conf.EnablePolicyTrace c.offloadIO = conf.OffloadIO + c.offlineAccess = !conf.BlockOfflineAccess c.syncToFlush = conf.SyncToFlush c.syncToDelete = !conf.SyncNoOp c.refreshSec = conf.RefreshSec @@ -323,8 +326,8 @@ func (c *FileCache) Configure(_ bool) error { c.diskHighWaterMark = (((conf.MaxSizeMB * MB) * float64(cacheConfig.highThreshold)) / 100) } - log.Info("FileCache::Configure : create-empty %t, cache-timeout %d, tmp-path %s, max-size-mb %d, high-mark %d, low-mark %d, refresh-sec %v, max-eviction %v, hard-limit %v, policy %s, allow-non-empty-temp %t, cleanup-on-start %t, policy-trace %t, offload-io %t, sync-to-flush %t, ignore-sync %t, defaultPermission %v, diskHighWaterMark %v, maxCacheSize %v, mountPath %v", - c.createEmptyFile, int(c.cacheTimeout), c.tmpPath, int(cacheConfig.maxSizeMB), int(cacheConfig.highThreshold), int(cacheConfig.lowThreshold), c.refreshSec, cacheConfig.maxEviction, c.hardLimit, conf.Policy, c.allowNonEmpty, c.cleanupOnStart, c.policyTrace, c.offloadIO, c.syncToFlush, c.syncToDelete, c.defaultPermission, c.diskHighWaterMark, c.maxCacheSize, c.mountPath) + log.Info("FileCache::Configure : create-empty %t, cache-timeout %d, tmp-path %s, max-size-mb %d, high-mark %d, low-mark %d, refresh-sec %v, max-eviction %v, hard-limit %v, policy %s, allow-non-empty-temp %t, cleanup-on-start %t, policy-trace %t, offload-io %t, offline-access %t, sync-to-flush %t, ignore-sync %t, defaultPermission %v, diskHighWaterMark %v, maxCacheSize %v, mountPath %v", + c.createEmptyFile, int(c.cacheTimeout), c.tmpPath, int(cacheConfig.maxSizeMB), int(cacheConfig.highThreshold), int(cacheConfig.lowThreshold), c.refreshSec, cacheConfig.maxEviction, c.hardLimit, conf.Policy, c.allowNonEmpty, c.cleanupOnStart, c.policyTrace, c.offloadIO, c.offlineAccess, c.syncToFlush, c.syncToDelete, c.defaultPermission, c.diskHighWaterMark, c.maxCacheSize, c.mountPath) return nil } diff --git a/setup/baseConfig.yaml b/setup/baseConfig.yaml index f9c2f4269..cd296f2c6 100644 --- a/setup/baseConfig.yaml +++ b/setup/baseConfig.yaml @@ -105,6 +105,7 @@ file_cache: create-empty-file: true|false allow-non-empty-temp: true|false cleanup-on-start: true|false + block-offline-access: true|false policy-trace: true|false offload-io: true|false sync-to-flush: true|false From 21456c4a3d8df803268a1e0482c5fecb24537b3c Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 24 Oct 2024 20:40:19 -0600 Subject: [PATCH 004/136] Add new error for uncached directories --- common/types.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/common/types.go b/common/types.go index 5a89aa9d9..e259d6de9 100644 --- a/common/types.go +++ b/common/types.go @@ -84,7 +84,7 @@ func CloudfuseVersion_() string { return cloudfuseVersion_ } -// custom error shared by different components +// custom errors shared by different components type CloudUnreachableError struct { Message string CloudStorageError error @@ -107,6 +107,28 @@ func (e CloudUnreachableError) Is(target error) bool { return ok } +type NoCachedDataError struct { + Message string + CacheError error +} + +func NewNoCachedDataError(originalError error) CloudUnreachableError { + return CloudUnreachableError{ + Message: "Failed to connect to cloud storage", + CloudStorageError: originalError, + } +} +func (e NoCachedDataError) Error() string { + return fmt.Sprintf("%s. Here's why: %v", e.Message, e.CacheError) +} +func (e NoCachedDataError) Unwrap() error { + return e.CacheError +} +func (e NoCachedDataError) Is(target error) bool { + _, ok := target.(*NoCachedDataError) + return ok +} + var DefaultWorkDir string var DefaultLogFilePath string var StatsConfigFilePath string From 257bf5fb9aec6ac179c9bd867e860ae3737179ff Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 24 Oct 2024 21:56:23 -0600 Subject: [PATCH 005/136] Handle cloud errors in ac StreamDir --- component/attr_cache/attr_cache.go | 31 +++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index 22fd5514d..89d3cc162 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -27,6 +27,7 @@ package attr_cache import ( "context" + "errors" "fmt" "os" "path" @@ -36,6 +37,7 @@ import ( "syscall" "time" + "github.com/Seagate/cloudfuse/common" "github.com/Seagate/cloudfuse/common/config" "github.com/Seagate/cloudfuse/common/log" "github.com/Seagate/cloudfuse/internal" @@ -421,11 +423,22 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O options.Name, numAdded, len(pathList)) } } - } - // add cached items in - if len(cachedPathList) > 0 { - log.Info("AttrCache::StreamDir : %s merging in %d list cache entries...", options.Name, len(cachedPathList)) - pathList = append(pathList, cachedPathList...) + } else { + var cloudUnreachableError *common.CloudUnreachableError + if errors.As(err, &cloudUnreachableError) { + // return whatever entries we have (but only if the token is empty) + entry, found := ac.cache.get(options.Name) + if options.Token == "" && found { + for _, v := range entry.children { + if v.exists() && v.valid() { + pathList = append(pathList, v.attr) + } + } + } else { + // the cloud is unavailable, and we have nothing to provide + return pathList, nextToken, common.NewNoCachedDataError(err) + } + } } // values should be returned in ascending order by key, without duplicates // sort @@ -436,7 +449,11 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O pathList = slices.CompactFunc[[]*internal.ObjAttr, *internal.ObjAttr](pathList, func(a, b *internal.ObjAttr) bool { return a.Path == b.Path }) - ac.cacheListSegment(pathList, options.Name, options.Token, nextToken) + if err == nil { + ac.cacheListSegment(pathList, options.Name, options.Token, nextToken) + } else { + log.Err("AttrCache::StreamDir : %s encountered error [%v]", err) + } log.Trace("AttrCache::StreamDir : %s returning %d entries", options.Name, len(pathList)) return pathList, nextToken, err } @@ -457,7 +474,7 @@ func (ac *AttrCache) fetchCachedDirList(path string, token string) ([]*internal. listDirCache, found := ac.cache.get(path) if !found { log.Warn("AttrCache::fetchCachedDirList : %s directory not found in cache", path) - return pathList, "", fmt.Errorf("%s directory not found in cache", path) + return pathList, "", common.NewNoCachedDataError(fmt.Errorf("%s directory not found in cache", path)) } // is the requested data cached? if listDirCache.listCache == nil { From 5c46e981583ceb7d4b36e8bc48d18834c80d2b5d Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 25 Oct 2024 15:33:08 -0600 Subject: [PATCH 006/136] Return descriptive errors to allow calling components to receive cached data but also know if the cloud connection is down. --- component/attr_cache/attr_cache.go | 65 ++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index 89d3cc162..d3c07b49a 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -424,8 +424,7 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O } } } else { - var cloudUnreachableError *common.CloudUnreachableError - if errors.As(err, &cloudUnreachableError) { + if errors.Is(err, &common.CloudUnreachableError{}) { // return whatever entries we have (but only if the token is empty) entry, found := ac.cache.get(options.Name) if options.Token == "" && found { @@ -449,6 +448,7 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O pathList = slices.CompactFunc[[]*internal.ObjAttr, *internal.ObjAttr](pathList, func(a, b *internal.ObjAttr) bool { return a.Path == b.Path }) + // cache the listing (if there was no error) if err == nil { ac.cacheListSegment(pathList, options.Name, options.Token, nextToken) } else { @@ -963,18 +963,11 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr value, found := ac.cache.get(options.Name) ac.cacheLock.RUnlock() if found && value.valid() && time.Since(value.cachedAt).Seconds() < float64(ac.cacheTimeout) { - // Try to serve the request from the attribute cache - // Is the entry marked deleted? - if !value.exists() { - log.Debug("AttrCache::GetAttr : %s (ENOENT) served from cache", options.Name) - return nil, syscall.ENOENT - } - // IsMetadataRetrieved is false in the case of ADLS List since the API does not support metadata. - // Once migration of ADLS list to blob endpoint is done (in future service versions), we can remove this. - // options.RetrieveMetadata is set by CopyFromFile and WriteFile which need metadata to ensure it is preserved. - if value.attr.IsMetadataRetrieved() || (!ac.enableSymlinks && !options.RetrieveMetadata) { - // path exists and we have all the metadata required or we do not care about metadata - return value.attr, nil + // Serve the request from the attribute cache + attr, err, metadataOkay := ac.getAttrFromItem(value, options) + // we need to make sure metadata flags look good (this is only an issue for Azure) + if metadataOkay { + return attr, err } } @@ -994,17 +987,47 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr if ac.cacheDirs { ac.markAncestorsInCloud(getParentDir(options.Name), time.Now()) } - } else if err == syscall.ENOENT { - // cache this entity not existing - ac.cache.insert(insertOptions{ - attr: internal.CreateObjAttr(options.Name, 0, time.Now()), - exists: false, - cachedAt: time.Now(), - }) + } else { + if err == syscall.ENOENT { + // cache this entity not existing + ac.cache.insert(insertOptions{ + attr: internal.CreateObjAttr(options.Name, 0, time.Now()), + exists: false, + cachedAt: time.Now(), + }) + } + // is the cloud connection down? + if errors.Is(err, &common.CloudUnreachableError{}) { + // do we have an entry, but it's expired? Let's serve that. + if found && value.valid() { + // Serve the request from the attribute cache + attr, getAttrErr, metadataOkay := ac.getAttrFromItem(value, options) + // we need to make sure metadata flags look good (this is only an issue for Azure) + if metadataOkay { + return attr, errors.Join(getAttrErr, err) + } + } + } } return pathAttr, err } +// return a GetAttr response from the attribute cache item +func (ac *AttrCache) getAttrFromItem(value *attrCacheItem, options internal.GetAttrOptions) (attr *internal.ObjAttr, err error, metadataOkay bool) { + // Is the entry marked deleted? + if !value.exists() { + log.Debug("AttrCache::GetAttr : %s (ENOENT) served from cache", options.Name) + err = syscall.ENOENT + metadataOkay = true + } else { + // IsMetadataRetrieved is false in the case of ADLS List since the API does not support metadata. + // Once migration of ADLS list to blob endpoint is done (in future service versions), we can remove this. + // options.RetrieveMetadata is set by CopyFromFile and WriteFile which need metadata to ensure it is preserved. + metadataOkay = value.attr.IsMetadataRetrieved() || (!ac.enableSymlinks && !options.RetrieveMetadata) + } + return value.attr, err, metadataOkay +} + // CreateLink : Mark the new link invalid func (ac *AttrCache) CreateLink(options internal.CreateLinkOptions) error { log.Trace("AttrCache::CreateLink : Create symlink %s -> %s", options.Name, options.Target) From 8ee353de99232ca7e1f7c9fdb6ab9f4c05e71f3d Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 25 Oct 2024 21:53:41 -0600 Subject: [PATCH 007/136] Add listingComplete flag, set it, and use it to guess whether items exist in cloud storage. Also account for invalid entries with AttrFlagExists set, which are gaps in knowledge even when the directory is listed. --- component/attr_cache/attr_cache.go | 35 ++++++++++++++++++++++++++++++ component/attr_cache/cacheMap.go | 2 ++ 2 files changed, 37 insertions(+) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index d3c07b49a..091ee3dc3 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -450,7 +450,13 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O }) // cache the listing (if there was no error) if err == nil { + // record when the directory was listed, an up to what token + // this will allow us to serve directory listings from this cache ac.cacheListSegment(pathList, options.Name, options.Token, nextToken) + // if the listing is complete, record the fact that we have a complete listing + if nextToken == "" { + ac.markListingComplete(options.Name) + } } else { log.Err("AttrCache::StreamDir : %s encountered error [%v]", err) } @@ -562,6 +568,15 @@ func (ac *AttrCache) cacheListSegment(pathList []*internal.ObjAttr, listDirPath listDirPath, token, nextToken, len(pathList)) } +func (ac *AttrCache) markListingComplete(listDirPath string) { + ac.cacheLock.Lock() + defer ac.cacheLock.Unlock() + listDirItem, found := ac.cache.get(listDirPath) + if found { + listDirItem.listingComplete = true + } +} + // IsDirEmpty: Whether or not the directory is empty func (ac *AttrCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool { log.Trace("AttrCache::IsDirEmpty : %s", options.Name) @@ -1006,6 +1021,26 @@ func (ac *AttrCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr if metadataOkay { return attr, errors.Join(getAttrErr, err) } + } else if found && value.attrFlag.IsSet(AttrFlagExists) { + // This entry says the item exists, but the entry is invalid + // When we flush or sync files or directories, we create entries like this + // These entries correspond to items that *might* exist in the cloud + // So we have to not claim to know whether they exist + return nil, common.NewNoCachedDataError(err) + } else { + // we have no cached data about this item + // but do we have a complete listing for its parent directory? + entry, found := ac.cache.get(getParentDir(options.Name)) + if found && entry.listingComplete { + return nil, errors.Join(syscall.ENOENT, err) + } else { + // we have no way of knowing whether the requested item is in the directory in the cloud + // NOTE: + // the OS can call GetAttr on a file without listing its parent directory + // so having a valid file entry in cache does not mean we have a complete listing of its parent + // so we can't just check if the directory has any children as a proxy for whether it's been listed + return nil, common.NewNoCachedDataError(err) + } } } } diff --git a/component/attr_cache/cacheMap.go b/component/attr_cache/cacheMap.go index e275e3c83..f1f51352b 100644 --- a/component/attr_cache/cacheMap.go +++ b/component/attr_cache/cacheMap.go @@ -58,6 +58,8 @@ type attrCacheItem struct { attrFlag common.BitMap16 children map[string]*attrCacheItem parent *attrCacheItem + + listingComplete bool } // all cache entries are organized into this structure From fb0746d46dbb3815efa989f6dbbe95c79c90a8b4 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 25 Oct 2024 22:28:23 -0600 Subject: [PATCH 008/136] tidy --- component/attr_cache/attr_cache.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index 091ee3dc3..5831053ff 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -423,20 +423,18 @@ func (ac *AttrCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O options.Name, numAdded, len(pathList)) } } - } else { - if errors.Is(err, &common.CloudUnreachableError{}) { - // return whatever entries we have (but only if the token is empty) - entry, found := ac.cache.get(options.Name) - if options.Token == "" && found { - for _, v := range entry.children { - if v.exists() && v.valid() { - pathList = append(pathList, v.attr) - } + } else if errors.Is(err, &common.CloudUnreachableError{}) { + // return whatever entries we have (but only if the token is empty) + entry, found := ac.cache.get(options.Name) + if options.Token == "" && found { + for _, v := range entry.children { + if v.exists() && v.valid() { + pathList = append(pathList, v.attr) } - } else { - // the cloud is unavailable, and we have nothing to provide - return pathList, nextToken, common.NewNoCachedDataError(err) } + } else { + // the cloud is unavailable, and we have nothing to provide + return pathList, nextToken, common.NewNoCachedDataError(err) } } // values should be returned in ascending order by key, without duplicates From 08f443d69bcab3288fb5edb91861dd8cfd3dc7df Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 25 Oct 2024 22:30:56 -0600 Subject: [PATCH 009/136] CreateDir, DeleteDir and StreamDir --- component/file_cache/file_cache.go | 65 +++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 160a645ae..842aefb54 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -27,6 +27,7 @@ package file_cache import ( "context" + "errors" "fmt" "io" "io/fs" @@ -61,6 +62,7 @@ type FileCache struct { cleanupOnStart bool policyTrace bool missedChmodList sync.Map // uses object name (common.JoinUnixFilepath) + offlineOps sync.Map // uses object name (common.JoinUnixFilepath) mountPath string // uses os.Separator (filepath.Join) allowOther bool offloadIO bool @@ -428,9 +430,30 @@ func (fc *FileCache) invalidateDirectory(name string) { } } -// Note: The primary purpose of the file cache is to keep track of files that are opened by the user. -// So we do not need to support some APIs like Create Directory since the file cache will manage -// creating local directories as needed. +func (fc *FileCache) CreateDir(options internal.CreateDirOptions) error { + log.Trace("FileCache::CreateDir : %s", options.Name) + + err := fc.NextComponent().CreateDir(options) + offline := errors.Is(err, &common.CloudUnreachableError{}) + if err == nil || errors.Is(err, os.ErrExist) || offline && fc.offlineAccess { + // make sure the directory exists in local cache + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + defer flock.Unlock() + localPath := filepath.Join(fc.tmpPath, options.Name) + mkdirErr := os.MkdirAll(localPath, options.Mode.Perm()) + if offline { + if !errors.Is(err, os.ErrExist) { + // This directory's probably local, so remember to sync it later + fc.offlineOps.Store(options.Name, internal.CreateObjAttrDir(options.Name)) + } + // offline access is enabled, so don't return the offline error + err = mkdirErr + } + } + + return err +} // DeleteDir: Recursively invalidate the directory and its children func (fc *FileCache) DeleteDir(options internal.DeleteDirOptions) error { @@ -438,10 +461,33 @@ func (fc *FileCache) DeleteDir(options internal.DeleteDirOptions) error { err := fc.NextComponent().DeleteDir(options) if err != nil { - log.Err("FileCache::DeleteDir : %s failed", options.Name) + log.Err("FileCache::DeleteDir : %s failed. Here's why: %v", options.Name, err) // There is a chance that meta file for directory was not created in which case // rest api delete will fail while we still need to cleanup the local cache for the same } + // is the cloud connection down? + if errors.Is(err, &common.CloudUnreachableError{}) { + // if offline access is disabled, do not touch the local file cache + if !fc.offlineAccess { + return err + } + // offline access is enabled + // to prevent consistency issues: if this is not a local directory, don't delete it locally + attr, getAttrErr := fc.GetAttr(internal.GetAttrOptions{Name: options.Name}) + // is the directory in cloud storage? + if getAttrErr == nil || attr != nil { + // directory exists in cloud storage - so do nothing + return err + } + // do we *know* that the directory does not exist in cloud storage? + if !errors.Is(getAttrErr, os.ErrNotExist) || errors.Is(getAttrErr, &common.NoCachedDataError{}) { + // directory *might* exist in cloud storage (we do not know) - so do nothing + return err + } + // we're pretty sure the directory does not exist in cloud storage + // so it was a local directory, and we need to remove it from the deferred cloud operations + fc.offlineOps.Delete(options.Name) + } fc.invalidateDirectory(options.Name) return err @@ -456,6 +502,13 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O // To cover case 1, grab all entries from storage attrs, token, err := fc.NextComponent().StreamDir(options) + if errors.Is(err, &common.CloudUnreachableError{}) && fc.offlineAccess { + // we're offline and offline access is allowed, so let's check if we have valid a listing + if !errors.Is(err, &common.NoCachedDataError{}) { + // we have a valid listing. Let's drop the error message + err = nil + } + } if err != nil { return attrs, token, err } @@ -1267,9 +1320,9 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) - info, err := os.Stat(localPath) + info, statErr := os.Stat(localPath) // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. - if err == nil && !info.IsDir() { + if statErr == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes // Return from local cache only if file is not under download or deletion // If file is under download then taking size or mod time from it will be incorrect. From 86a743a9542855609f8d645c17eb3188e913004f Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Fri, 1 Nov 2024 14:05:18 -0600 Subject: [PATCH 010/136] GetAttr --- component/file_cache/file_cache.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 842aefb54..4433965e5 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1306,6 +1306,24 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover case 1, get attributes from storage var exists bool attrs, err := fc.NextComponent().GetAttr(options) + // are we offline? + if errors.Is(err, &common.CloudUnreachableError{}) { + // if offline access is not enabled, return the error + if !fc.offlineAccess { + return nil, err + } + // offline access is enabled + // but if we have no information to return anyway, just return the error + if errors.Is(err, &common.NoCachedDataError{}) { + return nil, err + } + // we have a valid result - clean up the error + if errors.Is(err, os.ErrNotExist) { + err = syscall.ENOENT + } else { + err = nil + } + } if err != nil { if err == syscall.ENOENT || os.IsNotExist(err) { log.Debug("FileCache::GetAttr : %s does not exist in cloud storage", options.Name) From a90483d6af5b9863ecc85162adbe80777dde690d Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 11 Nov 2024 17:09:23 -0700 Subject: [PATCH 011/136] parse S3 errors in List() --- component/s3storage/s3wrappers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/s3storage/s3wrappers.go b/component/s3storage/s3wrappers.go index 2cf5f0963..7700ec624 100644 --- a/component/s3storage/s3wrappers.go +++ b/component/s3storage/s3wrappers.go @@ -354,8 +354,8 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal. // fetch and process a single result page output, err := paginator.NextPage(context.Background()) if err != nil { - log.Err("Client::List : Failed to list objects in bucket %v with prefix %v. Here's why: %v", prefix, bucketName, err) - return objectAttrList, nil, err + attemptedAction := fmt.Sprintf("list objects in bucket %v with prefix %v", bucketName, prefix) + return objectAttrList, nil, parseS3Err(err, attemptedAction) } if output.IsTruncated != nil && *output.IsTruncated { From 707ba3ee78c4eac2659aee2cb19c9c15e2993f31 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 12 Nov 2024 11:35:03 -0700 Subject: [PATCH 012/136] IsDirEmpty --- component/attr_cache/attr_cache.go | 42 ++++++++++++++++-------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/component/attr_cache/attr_cache.go b/component/attr_cache/attr_cache.go index 5831053ff..0234cf1a2 100644 --- a/component/attr_cache/attr_cache.go +++ b/component/attr_cache/attr_cache.go @@ -212,12 +212,12 @@ func (ac *AttrCache) deleteDirectory(path string, deletedAt time.Time) error { } // does the cache show this path as existing? -func (ac *AttrCache) pathExistsInCache(path string) bool { +func (ac *AttrCache) getItemIfExists(path string) *attrCacheItem { item, found := ac.cache.get(path) - if !found { - return false + if found && item.exists() { + return item } - return item.exists() + return nil } // returns the parent directory (without a trailing slash) @@ -343,7 +343,9 @@ func (ac *AttrCache) CreateDir(options internal.CreateDirOptions) error { exists: true, cachedAt: time.Now(), }) - // update flags for tracking directory existence + // this is a new directory, so we have a complete (empty) listing for it + newDirAttrCacheItem.listingComplete = true + // update flag for tracking directory existence if ac.cacheDirs { newDirAttrCacheItem.markInCloud(false) } @@ -582,38 +584,38 @@ func (ac *AttrCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool { // This function only has a use if we're caching directories if !ac.cacheDirs { log.Debug("AttrCache::IsDirEmpty : %s Dir cache is disabled. Checking with container", options.Name) + // when offline, this will return false return ac.NextComponent().IsDirEmpty(options) } // Is the directory in our cache? ac.cacheLock.RLock() - pathInCache := ac.pathExistsInCache(options.Name) - ac.cacheLock.RUnlock() + defer ac.cacheLock.RUnlock() + item := ac.getItemIfExists(options.Name) // If the directory does not exist in the attribute cache then let the next component answer - if !pathInCache { + if item == nil { log.Debug("AttrCache::IsDirEmpty : %s not found in attr_cache. Checking with container", options.Name) return ac.NextComponent().IsDirEmpty(options) } log.Debug("AttrCache::IsDirEmpty : %s found in attr_cache", options.Name) // Check if the cached directory is empty or not - if ac.anyContentsInCache(options.Name) { + if item.hasExistingChildren() { log.Debug("AttrCache::IsDirEmpty : %s has a subpath in attr_cache", options.Name) return false } + // do we have a complete listing? + if item.listingComplete { + // we know the directory is empty + return true + } // Dir is in cache but no contents are, so check with container log.Debug("AttrCache::IsDirEmpty : %s children not found in cache. Checking with container", options.Name) return ac.NextComponent().IsDirEmpty(options) } -func (ac *AttrCache) anyContentsInCache(prefix string) bool { - ac.cacheLock.RLock() - defer ac.cacheLock.RUnlock() - - directory, found := ac.cache.get(prefix) - if found && directory.exists() { - for _, chldItem := range directory.children { - if chldItem.exists() { - return true - } +func (directory *attrCacheItem) hasExistingChildren() bool { + for _, childItem := range directory.children { + if childItem.exists() { + return true } } return false @@ -635,7 +637,7 @@ func (ac *AttrCache) RenameDir(options internal.RenameDirOptions) error { if ac.cacheDirs { // if attr_cache is tracking directories, validate this rename // First, check if the destination directory already exists - if ac.pathExistsInCache(options.Dst) { + if ac.getItemIfExists(options.Dst) != nil { return os.ErrExist } } else { From bed166b497ef4ec0f80ce20a3f5bbb0922ab3746 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 12 Nov 2024 11:41:20 -0700 Subject: [PATCH 013/136] Fix tests --- component/file_cache/file_cache_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 761896049..1acb5bd9f 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -345,8 +345,8 @@ func (suite *fileCacheTestSuite) TestCreateDir() { err := suite.fileCache.CreateDir(options) suite.assert.NoError(err) - // Path should not be added to the file cache - suite.assert.NoDirExists(filepath.Join(suite.cache_path, path)) + // Path should be added to the file cache + suite.assert.DirExists(filepath.Join(suite.cache_path, path)) // Path should be in fake storage suite.assert.DirExists(filepath.Join(suite.fake_storage_path, path)) } @@ -1378,8 +1378,8 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { time.Sleep(500 * time.Millisecond) // Check once before the cache cleanup that file exists suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall exists in cache - time.Sleep(1 * time.Second) // Wait for the cache cleanup to occur - suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache + time.Sleep(1 * time.Second) // Wait for the cache cleanup to occur + suite.assert.NoFileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache } func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanupWithNoTimeout() { From f1878feaa184e6f5363b6bbc9469564ae89c8784 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 12 Nov 2024 11:43:00 -0700 Subject: [PATCH 014/136] Lengthen wait to prevent inconsistent results --- component/file_cache/file_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 1acb5bd9f..ac3252b52 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -1378,7 +1378,7 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { time.Sleep(500 * time.Millisecond) // Check once before the cache cleanup that file exists suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall exists in cache - time.Sleep(1 * time.Second) // Wait for the cache cleanup to occur + time.Sleep(2 * time.Second) // Wait for the cache cleanup to occur suite.assert.NoFileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache } From f7f5532e04982a71e9fbe1f81431cd37adfb06e1 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 14 Nov 2024 11:55:44 -0700 Subject: [PATCH 015/136] Add parameter to setupTestHelper to use mock instead of loopback --- component/file_cache/file_cache_test.go | 89 ++++++++++++++----------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index ac3252b52..4939fb210 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -49,6 +49,7 @@ import ( "github.com/Seagate/cloudfuse/component/loopback" "github.com/Seagate/cloudfuse/internal" "github.com/Seagate/cloudfuse/internal/handlemap" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" @@ -60,9 +61,11 @@ type fileCacheTestSuite struct { suite.Suite assert *assert.Assertions fileCache *FileCache - loopback internal.Component + nextComponent internal.Component cache_path string // uses os.Separator (filepath.Join) fake_storage_path string // uses os.Separator (filepath.Join) + mock *internal.MockComponent + mockCtrl *gomock.Controller } func newLoopbackFS() internal.Component { @@ -110,18 +113,24 @@ func (suite *fileCacheTestSuite) SetupTest() { if err != nil { fmt.Printf("fileCacheTestSuite::SetupTest : os.RemoveAll(%s) failed [%v]\n", suite.fake_storage_path, err) } - suite.setupTestHelper(defaultConfig) + suite.setupTestHelper(defaultConfig, false) } -func (suite *fileCacheTestSuite) setupTestHelper(configuration string) { +func (suite *fileCacheTestSuite) setupTestHelper(configuration string, useMock bool) { suite.assert = assert.New(suite.T()) config.ReadConfigFromReader(strings.NewReader(configuration)) - suite.loopback = newLoopbackFS() - suite.fileCache = newTestFileCache(suite.loopback) - err := suite.loopback.Start(context.Background()) + if useMock { + suite.mockCtrl = gomock.NewController(suite.T()) + suite.mock = internal.NewMockComponent(suite.mockCtrl) + suite.nextComponent = suite.mock + } else { + suite.nextComponent = newLoopbackFS() + } + suite.fileCache = newTestFileCache(suite.nextComponent) + err := suite.nextComponent.Start(context.Background()) if err != nil { - panic(fmt.Sprintf("Unable to start loopback [%s]", err.Error())) + panic(fmt.Sprintf("Unable to start next component [%s]", err.Error())) } err = suite.fileCache.Start(context.Background()) if err != nil { @@ -131,7 +140,7 @@ func (suite *fileCacheTestSuite) setupTestHelper(configuration string) { } func (suite *fileCacheTestSuite) cleanupTest() { - suite.loopback.Stop() + suite.nextComponent.Stop() err := suite.fileCache.Stop() if err != nil { panic(fmt.Sprintf("Unable to stop file cache [%s]", err.Error())) @@ -153,7 +162,7 @@ func (suite *fileCacheTestSuite) TestEmpty() { defer suite.cleanupTest() suite.cleanupTest() // teardown the default file cache generated emptyConfig := fmt.Sprintf("file_cache:\n path: %s\n\n offload-io: true\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(emptyConfig) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(emptyConfig, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) suite.assert.Equal("file_cache", suite.fileCache.Name()) suite.assert.Equal(suite.fileCache.tmpPath, suite.cache_path) @@ -187,7 +196,7 @@ func (suite *fileCacheTestSuite) TestConfig() { syncToFlush := false config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n policy: %s\n max-size-mb: %d\n timeout-sec: %d\n max-eviction: %d\n high-threshold: %d\n low-threshold: %d\n create-empty-file: %t\n allow-non-empty-temp: %t\n cleanup-on-start: %t\n sync-to-flush: %t", suite.cache_path, policy, maxSizeMb, cacheTimeout, maxDeletion, highThreshold, lowThreshold, createEmptyFile, allowNonEmptyTemp, cleanupOnStart, syncToFlush) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) suite.assert.Equal("file_cache", suite.fileCache.Name()) suite.assert.Equal(suite.fileCache.tmpPath, suite.cache_path) @@ -209,7 +218,7 @@ func (suite *fileCacheTestSuite) TestDefaultCacheSize() { defer suite.cleanupTest() // Setup config := fmt.Sprintf("file_cache:\n path: %s\n", suite.cache_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) var freeDisk int if runtime.GOOS == "windows" { cmd := exec.Command("fsutil", "volume", "diskfree", suite.cache_path) @@ -256,7 +265,7 @@ func (suite *fileCacheTestSuite) TestConfigPolicyTimeout() { cleanupOnStart := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n policy: %s\n max-size-mb: %d\n timeout-sec: %d\n max-eviction: %d\n high-threshold: %d\n low-threshold: %d\n create-empty-file: %t\n allow-non-empty-temp: %t\n cleanup-on-start: %t", suite.cache_path, policy, maxSizeMb, cacheTimeout, maxDeletion, highThreshold, lowThreshold, createEmptyFile, allowNonEmptyTemp, cleanupOnStart) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) suite.assert.Equal("file_cache", suite.fileCache.Name()) suite.assert.Equal(suite.fileCache.tmpPath, suite.cache_path) @@ -288,7 +297,7 @@ func (suite *fileCacheTestSuite) TestConfigPolicyDefaultTimeout() { cleanupOnStart := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n policy: %s\n max-size-mb: %d\n max-eviction: %d\n high-threshold: %d\n low-threshold: %d\n create-empty-file: %t\n allow-non-empty-temp: %t\n cleanup-on-start: %t", suite.cache_path, policy, maxSizeMb, maxDeletion, highThreshold, lowThreshold, createEmptyFile, allowNonEmptyTemp, cleanupOnStart) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) suite.assert.Equal("file_cache", suite.fileCache.Name()) suite.assert.Equal(suite.fileCache.tmpPath, suite.cache_path) @@ -320,7 +329,7 @@ func (suite *fileCacheTestSuite) TestConfigZero() { cleanupOnStart := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n policy: %s\n max-size-mb: %d\n timeout-sec: %d\n max-eviction: %d\n high-threshold: %d\n low-threshold: %d\n create-empty-file: %t\n allow-non-empty-temp: %t\n cleanup-on-start: %t", suite.cache_path, policy, maxSizeMb, cacheTimeout, maxDeletion, highThreshold, lowThreshold, createEmptyFile, allowNonEmptyTemp, cleanupOnStart) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) suite.assert.Equal("file_cache", suite.fileCache.Name()) suite.assert.Equal(suite.fileCache.tmpPath, suite.cache_path) @@ -358,7 +367,7 @@ func (suite *fileCacheTestSuite) TestDeleteDir() { createEmptyFile := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) dir := "dir" path := dir + "/file" @@ -401,11 +410,11 @@ func (suite *fileCacheTestSuite) TestStreamDirCase1() { file2 := name + "/file2" file3 := name + "/file3" // Create files directly in "fake_storage" - suite.loopback.CreateDir(internal.CreateDirOptions{Name: name, Mode: 0777}) - suite.loopback.CreateDir(internal.CreateDirOptions{Name: subdir, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3}) + suite.nextComponent.CreateDir(internal.CreateDirOptions{Name: name, Mode: 0777}) + suite.nextComponent.CreateDir(internal.CreateDirOptions{Name: subdir, Mode: 0777}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file1}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file2}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file3}) // Read the Directory dir, _, err := suite.fileCache.StreamDir(internal.StreamDirOptions{Name: name}) @@ -462,9 +471,9 @@ func (suite *fileCacheTestSuite) TestStreamDirCase3() { suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file3, Size: 1024}) // Create the files in fake_storage and simulate different sizes - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) // Read the Directory dir, _, err := suite.fileCache.StreamDir(internal.StreamDirOptions{Name: name}) @@ -509,10 +518,10 @@ func (suite *fileCacheTestSuite) TestStreamDirMixed() { suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file3, Size: 1024}) // Create the files in fake_storage and simulate different sizes - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file4, Mode: 0777}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file4, Mode: 0777}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file4, Size: 1024}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file4, Size: 0}) @@ -584,7 +593,7 @@ func (suite *fileCacheTestSuite) TestRenameDir() { createEmptyFile := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) src := "src" dst := "dst" @@ -733,7 +742,7 @@ func (suite *fileCacheTestSuite) TestCreateFileCreateEmptyFile() { createEmptyFile := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file2" options := internal.CreateFileOptions{Name: path} @@ -753,7 +762,7 @@ func (suite *fileCacheTestSuite) TestCreateFileInDirCreateEmptyFile() { createEmptyFile := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) dir := "dir" path := dir + "/file" @@ -939,7 +948,7 @@ func (suite *fileCacheTestSuite) TestCloseFileTimeout() { cacheTimeout := 1 config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", suite.cache_path, cacheTimeout, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file10" @@ -996,7 +1005,7 @@ func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { cacheTimeout := 1 config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", suite.cache_path, cacheTimeout, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file12" @@ -1173,7 +1182,7 @@ func (suite *fileCacheTestSuite) TestGetAttrCase1() { // Setup file := "file24" // Create files directly in "fake_storage" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) + suite.nextComponent.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) // Read the Directory attr, err := suite.fileCache.GetAttr(internal.GetAttrOptions{Name: file}) @@ -1353,7 +1362,7 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) src := "source4" dst := "destination4" @@ -1518,7 +1527,7 @@ func (suite *fileCacheTestSuite) TestCachePathSymlink() { suite.assert.NoError(err) configuration := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n\nloopbackfs:\n path: %s", symlinkPath, suite.fake_storage_path) - suite.setupTestHelper(configuration) + suite.setupTestHelper(configuration, false) file := "file39" handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) @@ -1541,7 +1550,7 @@ func (suite *fileCacheTestSuite) TestZZOffloadIO() { configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(configuration) + suite.setupTestHelper(configuration, false) file := "file40" handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) @@ -1557,7 +1566,7 @@ func (suite *fileCacheTestSuite) TestZZZZLazyWrite() { configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(configuration) + suite.setupTestHelper(configuration, false) suite.fileCache.lazyWrite = true file := "file101" @@ -1584,7 +1593,7 @@ func (suite *fileCacheTestSuite) TestStatFS() { config := fmt.Sprintf("file_cache:\n path: %s\n max-size-mb: %d\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", suite.cache_path, maxSizeMb, cacheTimeout, suite.fake_storage_path) os.Mkdir(suite.cache_path, 0777) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) file := "file41" handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) @@ -1609,7 +1618,7 @@ func (suite *fileCacheTestSuite) TestReadFileWithRefresh() { createEmptyFile := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n timeout-sec: 1000\n refresh-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file42" byteArr := []byte("test data") @@ -1663,7 +1672,7 @@ func (suite *fileCacheTestSuite) TestHardLimitOnSize() { // Configure to create empty files so we create the file in cloud storage config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n hard-limit: true\n max-size-mb: 2\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) data := make([]byte, 3*MB) pathbig := "filebig" @@ -1725,7 +1734,7 @@ func (suite *fileCacheTestSuite) TestHandleDataChange() { createEmptyFile := true config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n timeout-sec: 1000\n refresh-sec: 10\n\nloopbackfs:\n path: %s", suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + suite.setupTestHelper(config, false) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file43" err := os.WriteFile(suite.fake_storage_path+"/"+path, []byte("test data"), 0777) From 1350081e4cdf861bfd854c7e38553fbe444c4016 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 25 Nov 2024 02:33:16 -0700 Subject: [PATCH 016/136] Delete file immediately (not async) in CachePurge --- component/file_cache/cache_policy.go | 2 +- component/file_cache/file_cache.go | 5 ----- component/file_cache/lru_policy.go | 5 ++++- component/file_cache/lru_policy_test.go | 5 +---- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index 58f5a736b..f033bc39d 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -58,7 +58,7 @@ type cachePolicy interface { CacheValid(name string) // Mark the file as hit CacheInvalidate(name string) // Invalidate the file - CachePurge(name string) // Schedule the file for deletion + CachePurge(name string) // Delete the file from cache IsCached(name string) bool // Whether or not the cache policy considers this file cached diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index ff016f481..9a459d59f 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -777,11 +777,6 @@ func (fc *FileCache) DeleteFile(options internal.DeleteFileOptions) error { } localPath := filepath.Join(fc.tmpPath, options.Name) - err = deleteFile(localPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::DeleteFile : failed to delete local file %s [%s]", localPath, err.Error()) - } - fc.policy.CachePurge(localPath) return nil diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index ce003c402..af89d14ea 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -180,7 +180,10 @@ func (p *lruPolicy) CachePurge(name string) { log.Trace("lruPolicy::CachePurge : %s", name) p.removeNode(name) - p.deleteEvent <- name + err := deleteFile(name) + if err != nil && !os.IsNotExist(err) { + log.Err("lruPolicy::CachePurge : failed to delete local file %s. Here's why: %v", name, err) + } } func (p *lruPolicy) IsCached(name string) bool { diff --git a/component/file_cache/lru_policy_test.go b/component/file_cache/lru_policy_test.go index 43dae3826..59327249a 100644 --- a/component/file_cache/lru_policy_test.go +++ b/component/file_cache/lru_policy_test.go @@ -238,16 +238,13 @@ func (suite *lruPolicyTestSuite) TestCachePurge() { suite.assert.False(ok) suite.assert.Nil(n) - // test asynchronous file and folder deletion + // test synchronous file and folder deletion // purge all aPaths, in reverse order aPaths, abPaths, acPaths := suite.generateNestedDirectory("temp") for i := len(aPaths) - 1; i >= 0; i-- { suite.policy.CachePurge(aPaths[i]) } - // wait for asynchronous deletions - // in local testing, 1ms was enough - time.Sleep(100 * time.Millisecond) // validate all aPaths were deleted for _, path := range aPaths { suite.assert.NoFileExists(path) From e58a51686792f1a4e48bb8fa0bd3abc0568be7bf Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 25 Nov 2024 09:47:35 -0700 Subject: [PATCH 017/136] Set test timeout to 1s. Update tests to work with sync deletion. Create files directly in loopback whenever possible. --- component/file_cache/file_cache_linux_test.go | 30 +---- component/file_cache/file_cache_test.go | 126 ++++++------------ .../file_cache/file_cache_windows_test.go | 15 +-- component/file_cache/lru_policy_test.go | 62 +++------ 4 files changed, 67 insertions(+), 166 deletions(-) diff --git a/component/file_cache/file_cache_linux_test.go b/component/file_cache/file_cache_linux_test.go index ca30db698..195ca0da6 100644 --- a/component/file_cache/file_cache_linux_test.go +++ b/component/file_cache/file_cache_linux_test.go @@ -62,7 +62,7 @@ func (suite *fileCacheLinuxTestSuite) SetupTest() { rand := randomString(8) suite.cache_path = common.JoinUnixFilepath(home_dir, "file_cache"+rand) suite.fake_storage_path = common.JoinUnixFilepath(home_dir, "fake_storage"+rand) - defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) + defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) log.Debug(defaultConfig) // Delete the temp directories created @@ -111,23 +111,15 @@ func (suite *fileCacheLinuxTestSuite) cleanupTest() { func (suite *fileCacheLinuxTestSuite) TestChmodNotInCache() { defer suite.cleanupTest() - // Setup + // Setup - create file directly in fake storage path := "file33" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(suite.cache_path + "/" + path) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(suite.cache_path + "/" + path) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) // Chmod - err = suite.fileCache.Chmod(internal.ChmodOptions{Name: path, Mode: os.FileMode(0666)}) + err := suite.fileCache.Chmod(internal.ChmodOptions{Name: path, Mode: os.FileMode(0666)}) suite.assert.NoError(err) // Path in fake storage should be updated @@ -178,7 +170,6 @@ func (suite *fileCacheLinuxTestSuite) TestChmodCase2() { err = suite.fileCache.FlushFile(internal.FlushFileOptions{Handle: createHandle}) suite.assert.NoError(err) - // Path should be in the file cache with old mode (since we failed the operation) info, err := os.Stat(suite.cache_path + "/" + path) suite.assert.NoError(err) suite.assert.EqualValues(info.Mode(), newMode) @@ -187,6 +178,7 @@ func (suite *fileCacheLinuxTestSuite) TestChmodCase2() { suite.assert.NoError(err) // loop until file does not exist - done due to async nature of eviction + time.Sleep(time.Second) _, err = os.Stat(suite.cache_path + "/" + path) for i := 0; i < 1000 && !os.IsNotExist(err); i++ { time.Sleep(10 * time.Millisecond) @@ -206,15 +198,7 @@ func (suite *fileCacheLinuxTestSuite) TestChownNotInCache() { defer suite.cleanupTest() // Setup path := "file36" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(suite.cache_path + "/" + path) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(suite.cache_path + "/" + path) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) @@ -222,7 +206,7 @@ func (suite *fileCacheLinuxTestSuite) TestChownNotInCache() { // Chown owner := os.Getuid() group := os.Getgid() - err = suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) + err := suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) suite.assert.NoError(err) // Path in fake storage should be updated diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 761896049..81fbf70f6 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -98,7 +98,7 @@ func (suite *fileCacheTestSuite) SetupTest() { rand := randomString(8) suite.cache_path = filepath.Join(home_dir, "file_cache"+rand) suite.fake_storage_path = filepath.Join(home_dir, "fake_storage"+rand) - defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) + defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) log.Debug(defaultConfig) // Delete the temp directories created @@ -354,11 +354,6 @@ func (suite *fileCacheTestSuite) TestCreateDir() { func (suite *fileCacheTestSuite) TestDeleteDir() { defer suite.cleanupTest() // Setup - // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) dir := "dir" path := dir + "/file" @@ -376,8 +371,6 @@ func (suite *fileCacheTestSuite) TestDeleteDir() { // Delete the directory err = suite.fileCache.DeleteDir(internal.DeleteDirOptions{Name: dir}) suite.assert.NoError(err) - // wait for asynchronous deletion - time.Sleep(100 * time.Millisecond) // Directory should not be cached suite.assert.NoDirExists(filepath.Join(suite.cache_path, dir)) } @@ -579,13 +572,8 @@ func (suite *fileCacheTestSuite) TestIsDirEmptyFalseInCache() { func (suite *fileCacheTestSuite) TestRenameDir() { defer suite.cleanupTest() - // Setup - // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + // Setup src := "src" dst := "dst" err := suite.fileCache.CreateDir(internal.CreateDirOptions{Name: src, Mode: 0777}) @@ -602,8 +590,6 @@ func (suite *fileCacheTestSuite) TestRenameDir() { // Rename the directory err = suite.fileCache.RenameDir(internal.RenameDirOptions{Src: src, Dst: dst}) suite.assert.NoError(err) - // wait for asynchronous deletion - time.Sleep(100 * time.Millisecond) // src directory should not exist in local filesystem suite.assert.NoDirExists(filepath.Join(suite.cache_path, src)) // dst directory should exist and have contents from src @@ -859,21 +845,13 @@ func (suite *fileCacheTestSuite) TestDeleteFileError() { func (suite *fileCacheTestSuite) TestOpenFileNotInCache() { defer suite.cleanupTest() path := "file7" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) testData := "test data" data := []byte(testData) - suite.fileCache.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - // loop until file does not exist - done due to async nature of eviction - _, err := os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, path)) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.WriteFile(internal.WriteFileOptions{Handle: handle, Offset: 0, Data: data}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) - handle, err = suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR, Mode: suite.fileCache.defaultPermission}) + handle, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR, Mode: suite.fileCache.defaultPermission}) suite.assert.NoError(err) // Download is required err = suite.fileCache.downloadFile(handle) @@ -920,6 +898,7 @@ func (suite *fileCacheTestSuite) TestCloseFile() { suite.assert.NoError(err) // loop until file does not exist - done due to async nature of eviction + time.Sleep(time.Second) _, err = os.Stat(filepath.Join(suite.cache_path, path)) for i := 0; i < 1000 && !os.IsNotExist(err); i++ { time.Sleep(10 * time.Millisecond) @@ -935,14 +914,8 @@ func (suite *fileCacheTestSuite) TestCloseFile() { func (suite *fileCacheTestSuite) TestCloseFileTimeout() { defer suite.cleanupTest() - suite.cleanupTest() // teardown the default file cache generated - cacheTimeout := 1 - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", - suite.cache_path, cacheTimeout, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file10" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) // The file is in the cache but not in cloud storage (see TestCreateFileInDirCreateEmptyFile) @@ -955,9 +928,10 @@ func (suite *fileCacheTestSuite) TestCloseFileTimeout() { // File should be in cloud storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path)) + time.Sleep(time.Second) // loop until file does not exist - done due to async nature of eviction _, err = os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < (cacheTimeout*300) && !os.IsNotExist(err); i++ { + for i := 0; i < 300 && !os.IsNotExist(err); i++ { time.Sleep(10 * time.Millisecond) _, err = os.Stat(filepath.Join(suite.cache_path, path)) } @@ -991,15 +965,9 @@ func (suite *fileCacheTestSuite) TestOpenCloseHandleCount() { func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { defer suite.cleanupTest() - // Setup - suite.cleanupTest() // teardown the default file cache generated - cacheTimeout := 1 - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", - suite.cache_path, cacheTimeout, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + // Setup path := "file12" - handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) suite.assert.NoError(err) err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) @@ -1013,7 +981,7 @@ func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { suite.assert.NoError(err) // wait until file would be evicted (if not for being opened) - time.Sleep(time.Second * time.Duration(cacheTimeout*3)) + time.Sleep(time.Second * 3) // File should still be in cache suite.assert.FileExists(filepath.Join(suite.cache_path, path)) @@ -1242,8 +1210,9 @@ func (suite *fileCacheTestSuite) TestGetAttrCase4() { suite.assert.NoError(err) // Wait file is evicted + time.Sleep(time.Second) _, err = os.Stat(filepath.Join(suite.cache_path, file)) - for i := 0; i < 2000 && !os.IsNotExist(err); i++ { + for i := 0; i < 200 && !os.IsNotExist(err); i++ { time.Sleep(10 * time.Millisecond) _, err = os.Stat(filepath.Join(suite.cache_path, file)) } @@ -1275,23 +1244,13 @@ func (suite *fileCacheTestSuite) TestRenameFileNotInCache() { // Setup src := "source1" dst := "destination1" - handle, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) - suite.assert.NoError(err) - err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - suite.assert.NoError(err) - - _, err = os.Stat(filepath.Join(suite.cache_path, src)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, src)) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) // Path should be in fake storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, src)) // RenameFile - err = suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst}) + err := suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst}) suite.assert.NoError(err) // Path in fake storage should be updated @@ -1349,11 +1308,6 @@ func (suite *fileCacheTestSuite) TestRenameFileCase2() { func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { defer suite.cleanupTest() - suite.cleanupTest() - - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", - suite.cache_path, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) src := "source4" dst := "destination4" @@ -1375,14 +1329,24 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { suite.assert.NoFileExists(suite.fake_storage_path + "/" + src) // Src does not exist suite.assert.FileExists(suite.fake_storage_path + "/" + dst) // Dst does exist - time.Sleep(500 * time.Millisecond) // Check once before the cache cleanup that file exists suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall exists in cache - time.Sleep(1 * time.Second) // Wait for the cache cleanup to occur - suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache + // Wait for the cache cleanup to occur + time.Sleep(time.Second) + _, err = os.Stat(filepath.Join(suite.cache_path, dst)) + for i := 0; i < 200 && !os.IsNotExist(err); i++ { + time.Sleep(10 * time.Millisecond) + _, err = os.Stat(filepath.Join(suite.cache_path, dst)) + } + suite.assert.NoFileExists(filepath.Join(suite.cache_path, dst)) // Dst shall not exists in cache } func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanupWithNoTimeout() { + suite.cleanupTest() + config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", + suite.cache_path, suite.fake_storage_path) + suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) + defer suite.cleanupTest() src := "source5" @@ -1402,7 +1366,6 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanupWithNoTimeout() { suite.assert.NoFileExists(suite.fake_storage_path + "/" + src) // Src does not exist suite.assert.FileExists(suite.fake_storage_path + "/" + dst) // Dst does exist - time.Sleep(100 * time.Millisecond) // Wait for the cache cleanup to occur // cache should be completely clean suite.assert.False(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, src))) suite.assert.False(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, dst))) @@ -1414,22 +1377,14 @@ func (suite *fileCacheTestSuite) TestTruncateFileNotInCache() { defer suite.cleanupTest() // Setup path := "file30" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(filepath.Join(suite.cache_path, path)) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) // Path should be in fake storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path)) // Chmod size := 1024 - err = suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: path, Size: int64(size)}) + err := suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: path, Size: int64(size)}) suite.assert.NoError(err) // Path in fake storage should be updated @@ -1554,10 +1509,6 @@ func (suite *fileCacheTestSuite) TestZZOffloadIO() { func (suite *fileCacheTestSuite) TestZZZZLazyWrite() { defer suite.cleanupTest() - configuration := fmt.Sprintf("file_cache:\n path: %s\n timeout-sec: 0\n\nloopbackfs:\n path: %s", - suite.cache_path, suite.fake_storage_path) - - suite.setupTestHelper(configuration) suite.fileCache.lazyWrite = true file := "file101" @@ -1569,12 +1520,17 @@ func (suite *fileCacheTestSuite) TestZZZZLazyWrite() { // As lazy write is enabled flush shall not upload the file suite.assert.True(handle.Dirty()) + // File is uploaded async on close _ = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - time.Sleep(5 * time.Second) - suite.fileCache.lazyWrite = false + // Wait for the upload + for i := 0; i < 500 && handle.Dirty(); i++ { + time.Sleep(10 * time.Millisecond) + } - // As lazy write is enabled flush shall not upload the file suite.assert.False(handle.Dirty()) + + // cleanup + suite.fileCache.lazyWrite = false } func (suite *fileCacheTestSuite) TestStatFS() { @@ -1721,11 +1677,6 @@ func (suite *fileCacheTestSuite) TestHardLimitOnSize() { func (suite *fileCacheTestSuite) TestHandleDataChange() { defer suite.cleanupTest() - // Configure to create empty files so we create the file in cloud storage - createEmptyFile := true - config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n create-empty-file: %t\n timeout-sec: 1000\n refresh-sec: 10\n\nloopbackfs:\n path: %s", - suite.cache_path, createEmptyFile, suite.fake_storage_path) - suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file43" err := os.WriteFile(suite.fake_storage_path+"/"+path, []byte("test data"), 0777) @@ -1746,7 +1697,6 @@ func (suite *fileCacheTestSuite) TestHandleDataChange() { suite.assert.Equal(9, n) err = suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: f}) suite.assert.NoError(err) - } // In order for 'go test' to run this suite, we need to create diff --git a/component/file_cache/file_cache_windows_test.go b/component/file_cache/file_cache_windows_test.go index ad62f1957..7eb2010ea 100644 --- a/component/file_cache/file_cache_windows_test.go +++ b/component/file_cache/file_cache_windows_test.go @@ -33,7 +33,6 @@ import ( "os" "strings" "testing" - "time" "github.com/Seagate/cloudfuse/common" "github.com/Seagate/cloudfuse/common/config" @@ -61,7 +60,7 @@ func (suite *fileCacheWindowsTestSuite) SetupTest() { rand := randomString(8) suite.cache_path = common.JoinUnixFilepath(home_dir, "file_cache"+rand) suite.fake_storage_path = common.JoinUnixFilepath(home_dir, "fake_storage"+rand) - defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 0\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) + defaultConfig := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: 1\n\nloopbackfs:\n path: %s", suite.cache_path, suite.fake_storage_path) log.Debug(defaultConfig) // Delete the temp directories created @@ -112,15 +111,7 @@ func (suite *fileCacheWindowsTestSuite) TestChownNotInCache() { defer suite.cleanupTest() // Setup path := "file" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - _, err := os.Stat(suite.cache_path + "/" + path) - for i := 0; i < 1000 && !os.IsNotExist(err); i++ { - time.Sleep(10 * time.Millisecond) - _, err = os.Stat(suite.cache_path + "/" + path) - } - suite.assert.True(os.IsNotExist(err)) + suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) @@ -128,7 +119,7 @@ func (suite *fileCacheWindowsTestSuite) TestChownNotInCache() { // Checking that nothing changed with existing files owner := os.Getuid() group := os.Getgid() - err = suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) + err := suite.fileCache.Chown(internal.ChownOptions{Name: path, Owner: owner, Group: group}) suite.assert.Nil(err) // Path in fake storage should be updated diff --git a/component/file_cache/lru_policy_test.go b/component/file_cache/lru_policy_test.go index 59327249a..23a68fced 100644 --- a/component/file_cache/lru_policy_test.go +++ b/component/file_cache/lru_policy_test.go @@ -59,7 +59,7 @@ func (suite *lruPolicyTestSuite) SetupTest() { config := cachePolicyConfig{ tmpPath: cache_path, - cacheTimeout: 0, + cacheTimeout: 1, maxEviction: defaultMaxEviction, maxSizeMB: 0, highThreshold: defaultMaxThreshold, @@ -149,7 +149,7 @@ func (suite *lruPolicyTestSuite) generateNestedDirectory(aPath string) ([]string func (suite *lruPolicyTestSuite) TestDefault() { defer suite.cleanupTest() suite.assert.EqualValues("lru", suite.policy.Name()) - suite.assert.EqualValues(0, suite.policy.cacheTimeout) // cacheTimeout does not change + suite.assert.EqualValues(1, suite.policy.cacheTimeout) // cacheTimeout does not change suite.assert.EqualValues(defaultMaxEviction, suite.policy.maxEviction) suite.assert.EqualValues(0, suite.policy.maxSizeMB) suite.assert.EqualValues(defaultMaxThreshold, suite.policy.highThreshold) @@ -170,7 +170,7 @@ func (suite *lruPolicyTestSuite) TestUpdateConfig() { suite.policy.UpdateConfig(config) suite.assert.NotEqualValues(120, suite.policy.cacheTimeout) // cacheTimeout does not change - suite.assert.EqualValues(0, suite.policy.cacheTimeout) // cacheTimeout does not change + suite.assert.EqualValues(1, suite.policy.cacheTimeout) // cacheTimeout does not change suite.assert.EqualValues(100, suite.policy.maxEviction) suite.assert.EqualValues(10, suite.policy.maxSizeMB) suite.assert.EqualValues(70, suite.policy.highThreshold) @@ -191,6 +191,18 @@ func (suite *lruPolicyTestSuite) TestCacheValid() { func (suite *lruPolicyTestSuite) TestCacheInvalidate() { defer suite.cleanupTest() + suite.cleanupTest() + config := cachePolicyConfig{ + tmpPath: cache_path, + cacheTimeout: 0, + maxEviction: defaultMaxEviction, + maxSizeMB: 0, + highThreshold: defaultMaxThreshold, + lowThreshold: defaultMinThreshold, + fileLocks: &common.LockMap{}, + } + suite.setupTestHelper(config) + f, _ := os.Create(cache_path + "/temp") f.Close() suite.policy.CacheValid("temp") @@ -203,19 +215,6 @@ func (suite *lruPolicyTestSuite) TestCacheInvalidate() { func (suite *lruPolicyTestSuite) TestCacheInvalidateTimeout() { defer suite.cleanupTest() - suite.cleanupTest() - - config := cachePolicyConfig{ - tmpPath: cache_path, - cacheTimeout: 1, - maxEviction: defaultMaxEviction, - maxSizeMB: 0, - highThreshold: defaultMaxThreshold, - lowThreshold: defaultMinThreshold, - fileLocks: &common.LockMap{}, - } - - suite.setupTestHelper(config) suite.policy.CacheValid("temp") suite.policy.CacheInvalidate("temp") @@ -274,42 +273,19 @@ func (suite *lruPolicyTestSuite) TestIsCachedFalse() { func (suite *lruPolicyTestSuite) TestTimeout() { defer suite.cleanupTest() - suite.cleanupTest() - - config := cachePolicyConfig{ - tmpPath: cache_path, - cacheTimeout: 1, - maxEviction: defaultMaxEviction, - maxSizeMB: 0, - highThreshold: defaultMaxThreshold, - lowThreshold: defaultMinThreshold, - fileLocks: &common.LockMap{}, - } - - suite.setupTestHelper(config) suite.policy.CacheValid("temp") - time.Sleep(3 * time.Second) // Wait for time > cacheTimeout, the file should no longer be cached + // Wait for time > cacheTimeout, the file should no longer be cached + for i := 0; i < 300 && suite.policy.IsCached("temp"); i++ { + time.Sleep(10 * time.Millisecond) + } suite.assert.False(suite.policy.IsCached("temp")) } func (suite *lruPolicyTestSuite) TestMaxEvictionDefault() { defer suite.cleanupTest() - suite.cleanupTest() - - config := cachePolicyConfig{ - tmpPath: cache_path, - cacheTimeout: 1, - maxEviction: defaultMaxEviction, - maxSizeMB: 0, - highThreshold: defaultMaxThreshold, - lowThreshold: defaultMinThreshold, - fileLocks: &common.LockMap{}, - } - - suite.setupTestHelper(config) for i := 1; i < 5000; i++ { suite.policy.CacheValid("temp" + fmt.Sprint(i)) From e37f6092e89f306e3af57b28165c6f270ba0fee2 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 25 Nov 2024 09:57:33 -0700 Subject: [PATCH 018/136] Remove unused deleteEvent channel --- component/file_cache/lru_policy.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index af89d14ea..012de62f4 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -57,9 +57,6 @@ type lruPolicy struct { closeSignal chan int closeSignalValidate chan int - // Channel to contain files that needs to be deleted immediately - deleteEvent chan string - // Channel to contain files that are in use so push them up in lru list validateChan chan string @@ -108,8 +105,6 @@ func (p *lruPolicy) StartPolicy() error { p.closeSignal = make(chan int) p.closeSignalValidate = make(chan int) - - p.deleteEvent = make(chan string, 1000) p.validateChan = make(chan string, 10000) _, err := common.GetUsage(p.tmpPath) @@ -268,11 +263,6 @@ func (p *lruPolicy) clearCache() { for { select { - case name := <-p.deleteEvent: - log.Trace("lruPolicy::Clear-delete") - // we are asked to delete file explicitly - p.deleteItem(name) - case <-p.cacheTimeoutMonitor: log.Trace("lruPolicy::Clear-timeout monitor") // File cache timeout has hit so delete all unused files for past N seconds From 63f412d2458e5efd47006c9ebe09c10760f3d804 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 25 Nov 2024 22:56:52 -0700 Subject: [PATCH 019/136] Use cacheTimeoutMonitor even when timeout is zero. When timeout is zero, evict everything. --- component/file_cache/lru_policy.go | 40 ++++++++++++++---------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index ce003c402..8f9cb837e 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -125,8 +125,8 @@ func (p *lruPolicy) StartPolicy() error { log.Info("lruPolicy::StartPolicy : Policy set with %v timeout", p.cacheTimeout) - // if timeout is zero time.Tick will return nil - p.cacheTimeoutMonitor = time.Tick(time.Duration(time.Duration(p.cacheTimeout) * time.Second)) + // run the timeout monitor even with timeout set to zero + p.cacheTimeoutMonitor = time.Tick(time.Duration(time.Duration(max(p.cacheTimeout, 1)) * time.Second)) go p.clearCache() go p.asyncCacheValid() @@ -243,18 +243,7 @@ func (p *lruPolicy) cacheValidate(name string) { if node == p.head { return } - // remove node from its current position - if node.next != nil { - node.next.prev = node.prev - } - if node.prev != nil { - node.prev.next = node.next - } - // set node as head - node.prev = nil - node.next = p.head - p.head.prev = node - p.head = node + p.moveToHead(node) node.usage++ } @@ -343,8 +332,22 @@ func (p *lruPolicy) updateMarker() { log.Trace("lruPolicy::updateMarker") p.Lock() - node := p.lastMarker - // remove lastMarker from linked list + p.moveToHead(p.lastMarker) + // evict everything when timeout is zero + if p.cacheTimeout == 0 { + p.moveToHead(p.currMarker) + } else { + // swap lastMarker with currMarker + swap := p.lastMarker + p.lastMarker = p.currMarker + p.currMarker = swap + } + + p.Unlock() +} + +func (p *lruPolicy) moveToHead(node *lruNode) { + // remove the node from its position if node.next != nil { node.next.prev = node.prev } @@ -356,11 +359,6 @@ func (p *lruPolicy) updateMarker() { node.next = p.head p.head.prev = node p.head = node - // swap lastMarker with currMarker - p.lastMarker = p.currMarker - p.currMarker = node - - p.Unlock() } func (p *lruPolicy) deleteExpiredNodes() { From 3e71a0fda138371f3e382178fa3a24cb5aafe410 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 25 Nov 2024 23:42:18 -0700 Subject: [PATCH 020/136] Do not use CachePurge for eviction, even when timeout is zero. Drop CacheInvalidate function. --- component/file_cache/cache_policy.go | 5 ++--- component/file_cache/file_cache.go | 14 ------------ component/file_cache/file_cache_test.go | 2 +- component/file_cache/lru_policy.go | 21 ++++------------- component/file_cache/lru_policy_test.go | 30 ++----------------------- 5 files changed, 9 insertions(+), 63 deletions(-) diff --git a/component/file_cache/cache_policy.go b/component/file_cache/cache_policy.go index 58f5a736b..32c037371 100644 --- a/component/file_cache/cache_policy.go +++ b/component/file_cache/cache_policy.go @@ -56,9 +56,8 @@ type cachePolicy interface { UpdateConfig(cachePolicyConfig) error - CacheValid(name string) // Mark the file as hit - CacheInvalidate(name string) // Invalidate the file - CachePurge(name string) // Schedule the file for deletion + CacheValid(name string) // Mark the file as hit + CachePurge(name string) // Schedule the file for deletion IsCached(name string) bool // Whether or not the cache policy considers this file cached diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index ff016f481..35e1aa3be 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -652,12 +652,6 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { fc.policy.CachePurge(directoriesToPurge[i]) } - if fc.cacheTimeout == 0 { - // delete destination path immediately - log.Info("FileCache::RenameDir : Timeout is zero, so removing local destination %s", options.Dst) - go fc.invalidateDirectory(options.Dst) - } - return nil } @@ -1006,8 +1000,6 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock // if file has not been interactively read or written to by end user, then there is no cached file to close. _, noCachedHandle := options.Handle.GetValue("openFileOptions") - localPath := filepath.Join(fc.tmpPath, options.Handle.Path) - err := fc.FlushFile(internal.FlushFileOptions{Handle: options.Handle, CloseInProgress: true}) //nolint if err != nil { log.Err("FileCache::closeFileInternal : failed to flush file %s", options.Handle.Path) @@ -1044,7 +1036,6 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock return nil } - fc.policy.CacheInvalidate(localPath) // Invalidate the file from the local cache if the timeout is zero. return nil } @@ -1395,11 +1386,6 @@ func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) // delete the source from our cache policy // this will also delete the source file from local storage (if rename failed) fc.policy.CachePurge(localSrcPath) - - if fc.cacheTimeout == 0 { - // Destination file needs to be deleted immediately - go fc.policy.CachePurge(localDstPath) - } } // TruncateFile: Update the file with its new size. diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 761896049..d96d0fd70 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -1402,7 +1402,7 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanupWithNoTimeout() { suite.assert.NoFileExists(suite.fake_storage_path + "/" + src) // Src does not exist suite.assert.FileExists(suite.fake_storage_path + "/" + dst) // Dst does exist - time.Sleep(100 * time.Millisecond) // Wait for the cache cleanup to occur + time.Sleep(200 * time.Millisecond) // Wait for the cache cleanup to occur // cache should be completely clean suite.assert.False(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, src))) suite.assert.False(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, dst))) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index 8f9cb837e..a30cc79db 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -75,7 +75,8 @@ type lruPolicy struct { const ( // Check for disk usage in below number of minutes - DiskUsageCheckInterval = 1 + DiskUsageCheckInterval = 1 + minimumEvictionInterval = 100 * time.Millisecond ) var _ cachePolicy = &lruPolicy{} @@ -126,7 +127,8 @@ func (p *lruPolicy) StartPolicy() error { log.Info("lruPolicy::StartPolicy : Policy set with %v timeout", p.cacheTimeout) // run the timeout monitor even with timeout set to zero - p.cacheTimeoutMonitor = time.Tick(time.Duration(time.Duration(max(p.cacheTimeout, 1)) * time.Second)) + timeoutInterval := time.Duration(p.cacheTimeout) * time.Second + p.cacheTimeoutMonitor = time.Tick(max(timeoutInterval, minimumEvictionInterval)) go p.clearCache() go p.asyncCacheValid() @@ -161,21 +163,6 @@ func (p *lruPolicy) CacheValid(name string) { } } -func (p *lruPolicy) CacheInvalidate(name string) { - log.Trace("lruPolicy::CacheInvalidate : %s", name) - - // We check if the file is not in the nodeMap to deal with the case - // where timeout is 0 and there are multiple handles open to the file. - // When the first close comes, we will remove the entry from the map - // and attempt to delete the file. This deletion will fail (and be skipped) - // since there are other open handles. When the last close comes in, the map - // will be clean so we we need to try deleting the file. - _, found := p.nodeMap.Load(name) - if p.cacheTimeout == 0 || !found { - p.CachePurge(name) - } -} - func (p *lruPolicy) CachePurge(name string) { log.Trace("lruPolicy::CachePurge : %s", name) diff --git a/component/file_cache/lru_policy_test.go b/component/file_cache/lru_policy_test.go index 43dae3826..4e574ded5 100644 --- a/component/file_cache/lru_policy_test.go +++ b/component/file_cache/lru_policy_test.go @@ -189,21 +189,8 @@ func (suite *lruPolicyTestSuite) TestCacheValid() { suite.assert.EqualValues(1, node.usage) } -func (suite *lruPolicyTestSuite) TestCacheInvalidate() { - defer suite.cleanupTest() - f, _ := os.Create(cache_path + "/temp") - f.Close() - suite.policy.CacheValid("temp") - suite.policy.CacheInvalidate("temp") // this is equivalent to purge since timeout=0 - - n, ok := suite.policy.nodeMap.Load("temp") - suite.assert.False(ok) - suite.assert.Nil(n) -} - -func (suite *lruPolicyTestSuite) TestCacheInvalidateTimeout() { +func (suite *lruPolicyTestSuite) TestCachePurge() { defer suite.cleanupTest() - suite.cleanupTest() config := cachePolicyConfig{ tmpPath: cache_path, @@ -213,23 +200,10 @@ func (suite *lruPolicyTestSuite) TestCacheInvalidateTimeout() { highThreshold: defaultMaxThreshold, lowThreshold: defaultMinThreshold, fileLocks: &common.LockMap{}, + policyTrace: true, } - suite.setupTestHelper(config) - suite.policy.CacheValid("temp") - suite.policy.CacheInvalidate("temp") - - n, ok := suite.policy.nodeMap.Load("temp") - suite.assert.True(ok) - suite.assert.NotNil(n) - node := n.(*lruNode) - suite.assert.EqualValues("temp", node.name) - suite.assert.EqualValues(1, node.usage) -} - -func (suite *lruPolicyTestSuite) TestCachePurge() { - defer suite.cleanupTest() // test policy cache data suite.policy.CacheValid("temp") suite.policy.CachePurge("temp") From 5d2e6afe5e952a4857d01305d4f75d4602b89969 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 26 Nov 2024 00:00:08 -0700 Subject: [PATCH 021/136] fixup --- component/file_cache/lru_policy_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/file_cache/lru_policy_test.go b/component/file_cache/lru_policy_test.go index 4e574ded5..e3c50af54 100644 --- a/component/file_cache/lru_policy_test.go +++ b/component/file_cache/lru_policy_test.go @@ -200,7 +200,6 @@ func (suite *lruPolicyTestSuite) TestCachePurge() { highThreshold: defaultMaxThreshold, lowThreshold: defaultMinThreshold, fileLocks: &common.LockMap{}, - policyTrace: true, } suite.setupTestHelper(config) From 2b629864f9edc2811b3524c4046c7895489303ca Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 26 Nov 2024 14:11:33 -0700 Subject: [PATCH 022/136] Use fileLocks to keep file info coherent --- component/file_cache/file_cache.go | 66 ++++++++++++++++++------------ 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index e48739cf0..506b6b643 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -442,7 +442,11 @@ func (fc *FileCache) invalidateDirectory(name string) { if err == nil && d != nil { if !d.IsDir() { log.Debug("FileCache::invalidateDirectory : removing file %s from cache", path) + objPath := common.JoinUnixFilepath(name, d.Name()) + flock := fc.fileLocks.Get(objPath) + flock.Lock() fc.policy.CachePurge(path) + flock.Unlock() } else { // remember to delete the directory later (after its children) directoriesToPurge = append(directoriesToPurge, path) @@ -541,14 +545,19 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O if token == "" { for _, entry := range dirents { entryPath := common.JoinUnixFilepath(options.Name, entry.Name()) - if !entry.IsDir() && !fc.fileLocks.Locked(entryPath) { + if !entry.IsDir() { // This is an overhead for streamdir for now // As list is paginated we have no way to know whether this particular item exists both in local cache // and container or not. So we rely on getAttr to tell if entry was cached then it exists in cloud storage too // If entry does not exists on storage then only return a local item here. _, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: entryPath}) if err != nil && (err == syscall.ENOENT || os.IsNotExist(err)) { - info, err := entry.Info() // Grab local cache attributes + // get the lock on the file, to allow any pending operation to complete + flock := fc.fileLocks.Get(entryPath) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, entry.Name())) // Grab local cache attributes + flock.Unlock() // If local file is not locked then only use its attributes otherwise rely on container attributes if err == nil { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes @@ -621,7 +630,13 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath) + srcFlock := fc.fileLocks.Get(path) + dstFlock := fc.fileLocks.Get(newPath) + srcFlock.Lock() + dstFlock.Lock() fc.renameCachedFile(path, newPath) + srcFlock.Unlock() + dstFlock.Unlock() } else { log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath) // create the new directory @@ -755,7 +770,6 @@ func (fc *FileCache) validateStorageError(path string, err error, method string, return nil } -// DeleteFile: Invalidate the file in local cache. func (fc *FileCache) DeleteFile(options internal.DeleteFileOptions) error { log.Trace("FileCache::DeleteFile : name=%s", options.Name) @@ -1021,12 +1035,6 @@ func (fc *FileCache) closeFileInternal(options internal.CloseFileOptions, flock if options.Handle.Fsynced() { log.Trace("FileCache::closeFileInternal : fsync/sync op, purging %s", options.Handle.Path) localPath := filepath.Join(fc.tmpPath, options.Handle.Path) - - err = deleteFile(localPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::closeFileInternal : failed to delete local file %s [%s]", localPath, err.Error()) - } - fc.policy.CachePurge(localPath) return nil } @@ -1168,6 +1176,11 @@ func (fc *FileCache) FlushFile(options internal.FlushFileOptions) error { //defer exectime.StatTimeCurrentBlock("FileCache::FlushFile")() log.Trace("FileCache::FlushFile : handle=%d, path=%s", options.Handle.ID, options.Handle.Path) + // lock the file state + flock := fc.fileLocks.Get(options.Handle.Path) + flock.Lock() + defer flock.Unlock() + // The file should already be in the cache since CreateFile/OpenFile was called before and a shared lock was acquired. localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) @@ -1307,28 +1320,20 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) + flock := fc.fileLocks.Get(options.Name) + flock.Lock() info, err := os.Stat(localPath) + flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes - // Return from local cache only if file is not under download or deletion - // If file is under download then taking size or mod time from it will be incorrect. - if !fc.fileLocks.Locked(options.Name) { - log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) - attrs.Size = info.Size() - attrs.Mtime = info.ModTime() - } else { - log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name) - } + log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) + attrs.Size = info.Size() + attrs.Mtime = info.ModTime() } else { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes - if !strings.Contains(localPath, fc.tmpPath) { - // Here if the path is going out of the temp directory then return ENOENT - exists = false - } else { - log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) - exists = true - attrs = newObjAttr(options.Name, info) - } + log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) + exists = true + attrs = newObjAttr(options.Name, info) } } @@ -1370,6 +1375,7 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { return nil } +// both src and dst file locks should be locked before calling renameCachedFile func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) { err := os.Rename(localSrcPath, localDstPath) if err != nil { @@ -1448,6 +1454,10 @@ func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error { func (fc *FileCache) Chmod(options internal.ChmodOptions) error { log.Trace("FileCache::Chmod : Change mode of path %s", options.Name) + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + defer flock.Unlock() + // Update the file in cloud storage err := fc.NextComponent().Chmod(options) err = fc.validateStorageError(options.Name, err, "Chmod", false) @@ -1482,6 +1492,10 @@ func (fc *FileCache) Chmod(options internal.ChmodOptions) error { func (fc *FileCache) Chown(options internal.ChownOptions) error { log.Trace("FileCache::Chown : Change owner of path %s", options.Name) + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + defer flock.Unlock() + // Update the file in cloud storage err := fc.NextComponent().Chown(options) err = fc.validateStorageError(options.Name, err, "Chown", false) From 77206e8b30cb9789a7bba94f3eaece654416cff7 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 26 Nov 2024 23:24:16 -0700 Subject: [PATCH 023/136] Remove locks in Chmod & FlushFile to avoid deadlock --- component/file_cache/file_cache.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 506b6b643..9002ea26e 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1176,11 +1176,6 @@ func (fc *FileCache) FlushFile(options internal.FlushFileOptions) error { //defer exectime.StatTimeCurrentBlock("FileCache::FlushFile")() log.Trace("FileCache::FlushFile : handle=%d, path=%s", options.Handle.ID, options.Handle.Path) - // lock the file state - flock := fc.fileLocks.Get(options.Handle.Path) - flock.Lock() - defer flock.Unlock() - // The file should already be in the cache since CreateFile/OpenFile was called before and a shared lock was acquired. localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) @@ -1454,10 +1449,6 @@ func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error { func (fc *FileCache) Chmod(options internal.ChmodOptions) error { log.Trace("FileCache::Chmod : Change mode of path %s", options.Name) - flock := fc.fileLocks.Get(options.Name) - flock.Lock() - defer flock.Unlock() - // Update the file in cloud storage err := fc.NextComponent().Chmod(options) err = fc.validateStorageError(options.Name, err, "Chmod", false) From 383e8bf2e6c15c8f2d72fec12466ee302ca78090 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Tue, 26 Nov 2024 23:33:48 -0700 Subject: [PATCH 024/136] Use object paths when locking files in RenameDir. --- component/file_cache/file_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 9002ea26e..d02a17001 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -630,8 +630,8 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath) - srcFlock := fc.fileLocks.Get(path) - dstFlock := fc.fileLocks.Get(newPath) + srcFlock := fc.fileLocks.Get(common.JoinUnixFilepath(options.Src, path)) + dstFlock := fc.fileLocks.Get(common.JoinUnixFilepath(options.Dst, path)) srcFlock.Lock() dstFlock.Lock() fc.renameCachedFile(path, newPath) From 965676c3456d031f5dca2fe466016436c28943fa Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 00:04:47 -0700 Subject: [PATCH 025/136] Close loopback file handles to avoid issues (especially on Windows). --- component/file_cache/file_cache_linux_test.go | 6 ++- component/file_cache/file_cache_test.go | 37 ++++++++++++------- .../file_cache/file_cache_windows_test.go | 3 +- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/component/file_cache/file_cache_linux_test.go b/component/file_cache/file_cache_linux_test.go index 195ca0da6..dc95af9af 100644 --- a/component/file_cache/file_cache_linux_test.go +++ b/component/file_cache/file_cache_linux_test.go @@ -113,7 +113,8 @@ func (suite *fileCacheLinuxTestSuite) TestChmodNotInCache() { defer suite.cleanupTest() // Setup - create file directly in fake storage path := "file33" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) @@ -198,7 +199,8 @@ func (suite *fileCacheLinuxTestSuite) TestChownNotInCache() { defer suite.cleanupTest() // Setup path := "file36" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 5ab5d299b..7fcd76a52 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -396,9 +396,12 @@ func (suite *fileCacheTestSuite) TestStreamDirCase1() { // Create files directly in "fake_storage" suite.loopback.CreateDir(internal.CreateDirOptions{Name: name, Mode: 0777}) suite.loopback.CreateDir(internal.CreateDirOptions{Name: subdir, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Read the Directory dir, _, err := suite.fileCache.StreamDir(internal.StreamDirOptions{Name: name}) @@ -455,9 +458,12 @@ func (suite *fileCacheTestSuite) TestStreamDirCase3() { suite.fileCache.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file3, Size: 1024}) // Create the files in fake_storage and simulate different sizes - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777}) - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file2, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Read the Directory dir, _, err := suite.fileCache.StreamDir(internal.StreamDirOptions{Name: name}) @@ -502,10 +508,12 @@ func (suite *fileCacheTestSuite) TestStreamDirMixed() { suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file3, Size: 1024}) // Create the files in fake_storage and simulate different sizes - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) - - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file4, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file1, Mode: 0777}) // Length is default 0 + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file3, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) + handle, _ = suite.loopback.CreateFile(internal.CreateFileOptions{Name: file4, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file4, Size: 1024}) suite.fileCache.TruncateFile(internal.TruncateFileOptions{Name: file4, Size: 0}) @@ -1141,7 +1149,8 @@ func (suite *fileCacheTestSuite) TestGetAttrCase1() { // Setup file := "file24" // Create files directly in "fake_storage" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: file, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Read the Directory attr, err := suite.fileCache.GetAttr(internal.GetAttrOptions{Name: file}) @@ -1244,7 +1253,8 @@ func (suite *fileCacheTestSuite) TestRenameFileNotInCache() { // Setup src := "source1" dst := "destination1" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, src)) @@ -1378,7 +1388,8 @@ func (suite *fileCacheTestSuite) TestTruncateFileNotInCache() { defer suite.cleanupTest() // Setup path := "file30" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(filepath.Join(suite.fake_storage_path, path)) diff --git a/component/file_cache/file_cache_windows_test.go b/component/file_cache/file_cache_windows_test.go index 7eb2010ea..7ed2e754a 100644 --- a/component/file_cache/file_cache_windows_test.go +++ b/component/file_cache/file_cache_windows_test.go @@ -111,7 +111,8 @@ func (suite *fileCacheWindowsTestSuite) TestChownNotInCache() { defer suite.cleanupTest() // Setup path := "file" - suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + handle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: path, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: handle}) // Path should be in fake storage suite.assert.FileExists(suite.fake_storage_path + "/" + path) From 4c7794150f49979ae673f83db5ba24c2b04b3dca Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 00:45:28 -0700 Subject: [PATCH 026/136] Prevent attribute data corruption --- component/file_cache/file_cache.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 35e1aa3be..253332b77 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -528,8 +528,13 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O info, err := dirent.Info() if err == nil { - attr.Mtime = info.ModTime() - attr.Size = info.Size() + // attr is a pointer returned by NextComponent + // modifying attr could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attr + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs[i] = &newAttr } } i++ @@ -1320,8 +1325,13 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // If file is under download then taking size or mod time from it will be incorrect. if !fc.fileLocks.Locked(options.Name) { log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) - attrs.Size = info.Size() - attrs.Mtime = info.ModTime() + // attrs is a pointer returned by NextComponent + // modifying attrs could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attrs + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs = &newAttr } else { log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name) } From 60321b57b74349fa2f8385ccef03d5a489e2068e Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 01:32:20 -0700 Subject: [PATCH 027/136] Fix remaining race conditions detected in file cache --- component/file_cache/file_cache.go | 4 ++++ component/file_cache/lru_policy.go | 36 +++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 253332b77..ef797b8fd 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1063,7 +1063,9 @@ func (fc *FileCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, er // Read and write operations are very frequent so updating cache policy for every read is a costly operation // Update cache policy every 1K operations (includes both read and write) instead + options.Handle.Lock() options.Handle.OptCnt++ + options.Handle.Unlock() if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) @@ -1110,7 +1112,9 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) { // Read and write operations are very frequent so updating cache policy for every read is a costly operation // Update cache policy every 1K operations (includes both read and write) instead + options.Handle.Lock() options.Handle.OptCnt++ + options.Handle.Unlock() if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index a30cc79db..56d5de7a8 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -36,6 +36,7 @@ import ( ) type lruNode struct { + sync.RWMutex next *lruNode prev *lruNode usage int @@ -176,6 +177,8 @@ func (p *lruPolicy) IsCached(name string) bool { val, found := p.nodeMap.Load(name) if found { node := val.(*lruNode) + node.RLock() + defer node.RUnlock() log.Debug("lruPolicy::IsCached : %s, deleted:%t", name, node.deleted) if !node.deleted { return true @@ -221,18 +224,21 @@ func (p *lruPolicy) cacheValidate(name string) { }) node := val.(*lruNode) + // protect node data + node.Lock() + node.deleted = false + node.usage++ + node.Unlock() + + // protect the LRU p.Lock() defer p.Unlock() - node.deleted = false - // put node at head of linked list if node == p.head { return } p.moveToHead(node) - - node.usage++ } // For all other timer based activities we check the stuff here @@ -295,7 +301,9 @@ func (p *lruPolicy) removeNode(name string) { defer p.Unlock() node = val.(*lruNode) + node.Lock() node.deleted = true + node.Unlock() if node == p.head { p.head = node.next @@ -368,7 +376,9 @@ func (p *lruPolicy) deleteExpiredNodes() { for ; node != nil && count < p.maxEviction; node = node.next { delItems = append(delItems, node) + node.Lock() node.deleted = true + node.Unlock() count++ } @@ -385,7 +395,10 @@ func (p *lruPolicy) deleteExpiredNodes() { log.Debug("lruPolicy::deleteExpiredNodes : List generated %d items", count) for _, item := range delItems { - if item.deleted { + item.RLock() + restored := !item.deleted + item.RUnlock() + if !restored { p.removeNode(item.name) p.deleteItem(item.name) } @@ -408,15 +421,16 @@ func (p *lruPolicy) deleteItem(name string) { } flock := p.fileLocks.Get(azPath) - if p.fileLocks.Locked(azPath) { - log.Warn("lruPolicy::DeleteItem : File in under download %s", azPath) - p.CacheValid(name) - return - } - flock.Lock() defer flock.Unlock() + // check if the file has been marked valid again after removeNode was called + _, found := p.nodeMap.Load(name) + if found { + log.Warn("lruPolicy::DeleteItem : File marked valid %s", azPath) + return + } + // Check if there are any open handles to this file or not if flock.Count() > 0 { log.Warn("lruPolicy::DeleteItem : File in use %s", name) From ff1536612e561c9492a3e0bee2392c36a2263b60 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 02:04:12 -0700 Subject: [PATCH 028/136] Lock file to protect calls to stat. --- component/file_cache/file_cache.go | 55 +++++++++++++++--------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index ef797b8fd..4be55cc76 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -524,9 +524,12 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O j++ } else { // Case 3: Item is in both local cache and cloud - if !attr.IsDir() && !fc.fileLocks.Locked(attr.Path) { - info, err := dirent.Info() - + if !attr.IsDir() { + flock := fc.fileLocks.Get(attr.Path) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, dirent.Name())) // Grab local cache attributes + flock.Unlock() if err == nil { // attr is a pointer returned by NextComponent // modifying attr could corrupt cached directory listings @@ -546,14 +549,19 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O if token == "" { for _, entry := range dirents { entryPath := common.JoinUnixFilepath(options.Name, entry.Name()) - if !entry.IsDir() && !fc.fileLocks.Locked(entryPath) { + if !entry.IsDir() { // This is an overhead for streamdir for now // As list is paginated we have no way to know whether this particular item exists both in local cache // and container or not. So we rely on getAttr to tell if entry was cached then it exists in cloud storage too // If entry does not exists on storage then only return a local item here. _, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: entryPath}) if err != nil && (err == syscall.ENOENT || os.IsNotExist(err)) { - info, err := entry.Info() // Grab local cache attributes + // get the lock on the file, to allow any pending operation to complete + flock := fc.fileLocks.Get(entryPath) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, entry.Name())) // Grab local cache attributes + flock.Unlock() // If local file is not locked then only use its attributes otherwise rely on container attributes if err == nil { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes @@ -1321,33 +1329,26 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) + flock := fc.fileLocks.Get(options.Name) + // TODO: should we use a RWMutex and use RLock for stat calls? + flock.Lock() info, err := os.Stat(localPath) + flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes - // Return from local cache only if file is not under download or deletion - // If file is under download then taking size or mod time from it will be incorrect. - if !fc.fileLocks.Locked(options.Name) { - log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) - // attrs is a pointer returned by NextComponent - // modifying attrs could corrupt cached directory listings - // to update properties, we need to make a deep copy first - newAttr := *attrs - newAttr.Mtime = info.ModTime() - newAttr.Size = info.Size() - attrs = &newAttr - } else { - log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name) - } + log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) + // attrs is a pointer returned by NextComponent + // modifying attrs could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attrs + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs = &newAttr } else { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes - if !strings.Contains(localPath, fc.tmpPath) { - // Here if the path is going out of the temp directory then return ENOENT - exists = false - } else { - log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) - exists = true - attrs = newObjAttr(options.Name, info) - } + log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) + exists = true + attrs = newObjAttr(options.Name, info) } } From 7b45c80b2059d268bb01b4cc348e854bb4389aa3 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 03:02:57 -0700 Subject: [PATCH 029/136] Make timeout minimum 1s. Remove zero timeout handling code. Clean up linked list code. Update tests. Update documentation. --- component/file_cache/file_cache.go | 7 +-- component/file_cache/file_cache_test.go | 43 +++-------------- component/file_cache/lru_policy.go | 62 +++++++++++-------------- setup/baseConfig.yaml | 2 +- 4 files changed, 38 insertions(+), 76 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 35e1aa3be..4846cc96a 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -119,6 +119,7 @@ const ( defaultMaxThreshold = 80 defaultMinThreshold = 60 defaultFileCacheTimeout = 120 + minimumFileCacheTimeout = 1 defaultCacheUpdateCount = 100 MB = 1024 * 1024 ) @@ -220,9 +221,9 @@ func (c *FileCache) Configure(_ bool) error { c.createEmptyFile = conf.CreateEmptyFile if config.IsSet(compName + ".file-cache-timeout-in-seconds") { - c.cacheTimeout = float64(conf.V1Timeout) + c.cacheTimeout = max(float64(conf.V1Timeout), minimumFileCacheTimeout) } else if config.IsSet(compName + ".timeout-sec") { - c.cacheTimeout = float64(conf.Timeout) + c.cacheTimeout = max(float64(conf.Timeout), minimumFileCacheTimeout) } else { c.cacheTimeout = float64(defaultFileCacheTimeout) } @@ -341,7 +342,7 @@ func (c *FileCache) OnConfigChange() { } c.createEmptyFile = conf.CreateEmptyFile - c.cacheTimeout = float64(conf.Timeout) + c.cacheTimeout = max(float64(conf.Timeout), minimumFileCacheTimeout) c.policyTrace = conf.EnablePolicyTrace c.offloadIO = conf.OffloadIO c.maxCacheSize = conf.MaxSizeMB diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index d96d0fd70..da56a6a27 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -333,7 +333,7 @@ func (suite *fileCacheTestSuite) TestConfigZero() { suite.assert.Equal(suite.fileCache.createEmptyFile, createEmptyFile) suite.assert.Equal(suite.fileCache.allowNonEmpty, allowNonEmptyTemp) - suite.assert.EqualValues(suite.fileCache.cacheTimeout, cacheTimeout) + suite.assert.EqualValues(suite.fileCache.cacheTimeout, minimumFileCacheTimeout) suite.assert.Equal(suite.fileCache.cleanupOnStart, cleanupOnStart) } @@ -936,9 +936,8 @@ func (suite *fileCacheTestSuite) TestCloseFile() { func (suite *fileCacheTestSuite) TestCloseFileTimeout() { defer suite.cleanupTest() suite.cleanupTest() // teardown the default file cache generated - cacheTimeout := 1 config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", - suite.cache_path, cacheTimeout, suite.fake_storage_path) + suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file10" @@ -957,7 +956,7 @@ func (suite *fileCacheTestSuite) TestCloseFileTimeout() { // loop until file does not exist - done due to async nature of eviction _, err = os.Stat(filepath.Join(suite.cache_path, path)) - for i := 0; i < (cacheTimeout*300) && !os.IsNotExist(err); i++ { + for i := 0; i < (minimumFileCacheTimeout*300) && !os.IsNotExist(err); i++ { time.Sleep(10 * time.Millisecond) _, err = os.Stat(filepath.Join(suite.cache_path, path)) } @@ -993,9 +992,8 @@ func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { defer suite.cleanupTest() // Setup suite.cleanupTest() // teardown the default file cache generated - cacheTimeout := 1 config := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", - suite.cache_path, cacheTimeout, suite.fake_storage_path) + suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) suite.setupTestHelper(config) // setup a new file cache with a custom config (teardown will occur after the test as usual) path := "file12" @@ -1013,7 +1011,7 @@ func (suite *fileCacheTestSuite) TestOpenPreventsEviction() { suite.assert.NoError(err) // wait until file would be evicted (if not for being opened) - time.Sleep(time.Second * time.Duration(cacheTimeout*3)) + time.Sleep(time.Second * time.Duration(minimumFileCacheTimeout*3)) // File should still be in cache suite.assert.FileExists(filepath.Join(suite.cache_path, path)) @@ -1382,34 +1380,6 @@ func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanup() { suite.assert.FileExists(suite.cache_path + "/" + dst) // Dst shall not exists in cache } -func (suite *fileCacheTestSuite) TestRenameFileAndCacheCleanupWithNoTimeout() { - defer suite.cleanupTest() - - src := "source5" - dst := "destination5" - handle, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: src, Mode: 0666}) - suite.fileCache.CloseFile(internal.CloseFileOptions{Handle: handle}) - - // Path _might_ be in the file cache - // Path _should_ be in fake storage - suite.assert.FileExists(suite.fake_storage_path + "/" + src) - - // RenameFile - err := suite.fileCache.RenameFile(internal.RenameFileOptions{Src: src, Dst: dst}) - suite.assert.NoError(err) - - // Dst _might_ exist in cache, and the rename should be complete in fake storage - suite.assert.NoFileExists(suite.fake_storage_path + "/" + src) // Src does not exist - suite.assert.FileExists(suite.fake_storage_path + "/" + dst) // Dst does exist - - time.Sleep(200 * time.Millisecond) // Wait for the cache cleanup to occur - // cache should be completely clean - suite.assert.False(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, src))) - suite.assert.False(suite.fileCache.policy.IsCached(filepath.Join(suite.cache_path, dst))) - suite.assert.NoFileExists(suite.cache_path + "/" + src) - suite.assert.NoFileExists(suite.cache_path + "/" + dst) -} - func (suite *fileCacheTestSuite) TestTruncateFileNotInCache() { defer suite.cleanupTest() // Setup @@ -1485,9 +1455,8 @@ func (suite *fileCacheTestSuite) TestTruncateFileCase2() { func (suite *fileCacheTestSuite) TestZZMountPathConflict() { defer suite.cleanupTest() - cacheTimeout := 1 configuration := fmt.Sprintf("file_cache:\n path: %s\n offload-io: true\n timeout-sec: %d\n\nloopbackfs:\n path: %s", - suite.cache_path, cacheTimeout, suite.fake_storage_path) + suite.cache_path, minimumFileCacheTimeout, suite.fake_storage_path) fileCache := NewFileCacheComponent() config.ReadConfigFromReader(strings.NewReader(configuration)) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index a30cc79db..6c0782775 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -75,8 +75,7 @@ type lruPolicy struct { const ( // Check for disk usage in below number of minutes - DiskUsageCheckInterval = 1 - minimumEvictionInterval = 100 * time.Millisecond + DiskUsageCheckInterval = 1 ) var _ cachePolicy = &lruPolicy{} @@ -126,9 +125,8 @@ func (p *lruPolicy) StartPolicy() error { log.Info("lruPolicy::StartPolicy : Policy set with %v timeout", p.cacheTimeout) - // run the timeout monitor even with timeout set to zero - timeoutInterval := time.Duration(p.cacheTimeout) * time.Second - p.cacheTimeoutMonitor = time.Tick(max(timeoutInterval, minimumEvictionInterval)) + // start the timeout monitor + p.cacheTimeoutMonitor = time.Tick(time.Duration(p.cacheTimeout) * time.Second) go p.clearCache() go p.asyncCacheValid() @@ -230,7 +228,8 @@ func (p *lruPolicy) cacheValidate(name string) { if node == p.head { return } - p.moveToHead(node) + p.extractNode(node) + p.setHead(node) node.usage++ } @@ -297,51 +296,44 @@ func (p *lruPolicy) removeNode(name string) { node = val.(*lruNode) node.deleted = true - if node == p.head { - p.head = node.next - p.head.prev = nil - node.next = nil - return - } - - if node.next != nil { - node.next.prev = node.prev - } - - if node.prev != nil { - node.prev.next = node.next - } - node.prev = nil - node.next = nil + p.extractNode(node) } func (p *lruPolicy) updateMarker() { log.Trace("lruPolicy::updateMarker") p.Lock() - p.moveToHead(p.lastMarker) - // evict everything when timeout is zero - if p.cacheTimeout == 0 { - p.moveToHead(p.currMarker) - } else { - // swap lastMarker with currMarker - swap := p.lastMarker - p.lastMarker = p.currMarker - p.currMarker = swap - } + p.extractNode(p.lastMarker) + p.setHead(p.lastMarker) + // swap lastMarker with currMarker + swap := p.lastMarker + p.lastMarker = p.currMarker + p.currMarker = swap p.Unlock() } -func (p *lruPolicy) moveToHead(node *lruNode) { - // remove the node from its position +func (p *lruPolicy) extractNode(node *lruNode) { + // remove the node from its position in the list + + // head case + if node == p.head { + p.head = node.next + } + if node.next != nil { node.next.prev = node.prev } if node.prev != nil { node.prev.next = node.next } - // and insert it at the head + + node.prev = nil + node.next = nil +} + +func (p *lruPolicy) setHead(node *lruNode) { + // insert node at the head node.prev = nil node.next = p.head p.head.prev = node diff --git a/setup/baseConfig.yaml b/setup/baseConfig.yaml index 1a92120a6..431abf162 100644 --- a/setup/baseConfig.yaml +++ b/setup/baseConfig.yaml @@ -114,7 +114,7 @@ file_cache: # Attribute cache related configuration attr_cache: - timeout-sec: