diff --git a/pkg/cas/hardlinking_file_fetcher.go b/pkg/cas/hardlinking_file_fetcher.go index e98a745d..f548fd81 100644 --- a/pkg/cas/hardlinking_file_fetcher.go +++ b/pkg/cas/hardlinking_file_fetcher.go @@ -26,6 +26,9 @@ type hardlinkingFileFetcher struct { evictionLock sync.Mutex evictionSet eviction.Set[string] + + downloadsLock sync.Mutex + downloads map[string]<-chan struct{} } // NewHardlinkingFileFetcher is an adapter for FileFetcher that stores @@ -43,6 +46,8 @@ func NewHardlinkingFileFetcher(base FileFetcher, cacheDirectory filesystem.Direc filesSize: map[string]int64{}, evictionSet: evictionSet, + + downloads: map[string]<-chan struct{}{}, } } @@ -70,29 +75,52 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest key += "-x" } - // If the file is present in the cache, hardlink it to the destination. - wasMissing := false - ff.filesLock.RLock() - if _, ok := ff.filesSize[key]; ok { - ff.evictionLock.Lock() - ff.evictionSet.Touch(key) - ff.evictionLock.Unlock() - - if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil { - // Successfully hardlinked the file to its destination. - ff.filesLock.RUnlock() + for { + // If the file is present in the cache, hardlink it to the destination. + if err := ff.tryLinkFromCache(key, directory, name); err == nil { return nil } else if !os.IsNotExist(err) { - ff.filesLock.RUnlock() - return util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key) + return err } - // The file was part of the cache, even though it did not - // exist on disk. Some other process may have tampered - // with the cache directory's contents. - wasMissing = true + // A download is required. Let's see if one is already in progress. + ff.downloadsLock.Lock() + wait, ok := ff.downloads[key] + if ok { + // A download is already in progress. Wait for it to finish. + ff.downloadsLock.Unlock() + select { + case <-wait: + // Download finished. Loop back to try linking from + // cache. If missing (download failed or other issue), + // we'll attempt a new download. + continue + case <-ctx.Done(): + return util.StatusFromContext(ctx) + } + } + + // Start a new download. + break + } + newWait := make(chan struct{}) + ff.downloads[key] = newWait + ff.downloadsLock.Unlock() + + defer func() { + ff.downloadsLock.Lock() + delete(ff.downloads, key) + ff.downloadsLock.Unlock() + close(newWait) + }() + + // Check cache again in case another download completed between our initial + // tryLinkFromCache() call and acquiring the download lock. + if err := ff.tryLinkFromCache(key, directory, name); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err } - ff.filesLock.RUnlock() // Download the file at the intended location. if err := ff.base.GetFile(ctx, blobDigest, directory, name, isExecutable); err != nil { @@ -118,7 +146,7 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest ff.evictionSet.Insert(key) ff.filesSize[key] = sizeBytes ff.filesTotalSize += sizeBytes - } else if wasMissing { + } else { // Even though the file is part of our bookkeeping, we // observed it didn't exist. Repair this inconsistency. if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) { @@ -127,3 +155,32 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest } return nil } + +// tryLinkFromCache attempts to create a hardlink from the cache to a +// file in the build directory. It returns os.ErrNotExist if the file +// is not in the cache bookkeeping, or if it was in bookkeeping but +// missing on disk. +func (ff *hardlinkingFileFetcher) tryLinkFromCache(key string, directory filesystem.Directory, name path.Component) error { + ff.filesLock.RLock() + _, ok := ff.filesSize[key] + ff.filesLock.RUnlock() + if !ok { + return os.ErrNotExist + } + + ff.evictionLock.Lock() + ff.evictionSet.Touch(key) + ff.evictionLock.Unlock() + + if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil { + // Successfully hardlinked the file to its destination. + return nil + } else if !os.IsNotExist(err) { + return util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key) + } + + // The file was part of the cache, even though it did not + // exist on disk. Some other process may have tampered with + // the cache directory's contents. + return os.ErrNotExist +} diff --git a/pkg/cas/hardlinking_file_fetcher_test.go b/pkg/cas/hardlinking_file_fetcher_test.go index a2ab6704..953c2656 100644 --- a/pkg/cas/hardlinking_file_fetcher_test.go +++ b/pkg/cas/hardlinking_file_fetcher_test.go @@ -74,6 +74,8 @@ func TestHardlinkingFileFetcher(t *testing.T) { // Recover from the case where the cache directory gets cleaned // up by another process. If hardlinking returns ENOENT, we // should fall back to downloading and reinserting the file. + cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")). + Return(syscall.ENOENT) cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")). Return(syscall.ENOENT) baseFileFetcher.EXPECT().GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false) @@ -84,6 +86,8 @@ func TestHardlinkingFileFetcher(t *testing.T) { // The above may happen in multiple threads at the same time. // EEXIST errors should be ignored in that case. + cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")). + Return(syscall.ENOENT) cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")). Return(syscall.ENOENT) baseFileFetcher.EXPECT().GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false) @@ -94,6 +98,8 @@ func TestHardlinkingFileFetcher(t *testing.T) { fileFetcher.GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false)) // Errors other than EEXIST should be propagated as usual. + cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")). + Return(syscall.ENOENT) cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")). Return(syscall.ENOENT) baseFileFetcher.EXPECT().GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false)