Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 76 additions & 19 deletions pkg/cas/hardlinking_file_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type hardlinkingFileFetcher struct {

evictionLock sync.Mutex
evictionSet eviction.Set[string]

downloadsLock sync.Mutex
downloads map[string]<-chan struct{}
}

// NewHardlinkingFileFetcher is an adapter for FileFetcher that stores
Expand All @@ -43,6 +46,8 @@ func NewHardlinkingFileFetcher(base FileFetcher, cacheDirectory filesystem.Direc
filesSize: map[string]int64{},

evictionSet: evictionSet,

downloads: map[string]<-chan struct{}{},
}
}

Expand Down Expand Up @@ -70,29 +75,52 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest
key += "-x"
}

// If the file is present in the cache, hardlink it to the destination.
wasMissing := false
ff.filesLock.RLock()
if _, ok := ff.filesSize[key]; ok {
ff.evictionLock.Lock()
ff.evictionSet.Touch(key)
ff.evictionLock.Unlock()

if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil {
// Successfully hardlinked the file to its destination.
ff.filesLock.RUnlock()
for {
// If the file is present in the cache, hardlink it to the destination.
if err := ff.tryLinkFromCache(key, directory, name); err == nil {
return nil
} else if !os.IsNotExist(err) {
ff.filesLock.RUnlock()
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key)
return err
}

// The file was part of the cache, even though it did not
// exist on disk. Some other process may have tampered
// with the cache directory's contents.
wasMissing = true
// A download is required. Let's see if one is already in progress.
ff.downloadsLock.Lock()
wait, ok := ff.downloads[key]
if ok {
// A download is already in progress. Wait for it to finish.
ff.downloadsLock.Unlock()
select {
case <-wait:
// Download finished. Loop back to try linking from
// cache. If missing (download failed or other issue),
// we'll attempt a new download.
continue
case <-ctx.Done():
return util.StatusFromContext(ctx)
}
}

// Start a new download.
break
}
newWait := make(chan struct{})
ff.downloads[key] = newWait
ff.downloadsLock.Unlock()

defer func() {
ff.downloadsLock.Lock()
delete(ff.downloads, key)
ff.downloadsLock.Unlock()
close(newWait)
}()

// Check cache again in case another download completed between our initial
// tryLinkFromCache() call and acquiring the download lock.
if err := ff.tryLinkFromCache(key, directory, name); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err
}
ff.filesLock.RUnlock()

// Download the file at the intended location.
if err := ff.base.GetFile(ctx, blobDigest, directory, name, isExecutable); err != nil {
Expand All @@ -118,7 +146,7 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest
ff.evictionSet.Insert(key)
ff.filesSize[key] = sizeBytes
ff.filesTotalSize += sizeBytes
} else if wasMissing {
} else {
// Even though the file is part of our bookkeeping, we
// observed it didn't exist. Repair this inconsistency.
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
Expand All @@ -127,3 +155,32 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest
}
return nil
}

// tryLinkFromCache attempts to create a hardlink from the cache to a
// file in the build directory. It returns os.ErrNotExist if the file
// is not in the cache bookkeeping, or if it was in bookkeeping but
// missing on disk.
func (ff *hardlinkingFileFetcher) tryLinkFromCache(key string, directory filesystem.Directory, name path.Component) error {
ff.filesLock.RLock()
_, ok := ff.filesSize[key]
ff.filesLock.RUnlock()
if !ok {
return os.ErrNotExist
}

ff.evictionLock.Lock()
ff.evictionSet.Touch(key)
ff.evictionLock.Unlock()

if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil {
// Successfully hardlinked the file to its destination.
return nil
} else if !os.IsNotExist(err) {
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key)
}

// The file was part of the cache, even though it did not
// exist on disk. Some other process may have tampered with
// the cache directory's contents.
return os.ErrNotExist
}
6 changes: 6 additions & 0 deletions pkg/cas/hardlinking_file_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func TestHardlinkingFileFetcher(t *testing.T) {
// Recover from the case where the cache directory gets cleaned
// up by another process. If hardlinking returns ENOENT, we
// should fall back to downloading and reinserting the file.
cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")).
Return(syscall.ENOENT)
cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")).
Return(syscall.ENOENT)
baseFileFetcher.EXPECT().GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false)
Expand All @@ -84,6 +86,8 @@ func TestHardlinkingFileFetcher(t *testing.T) {

// The above may happen in multiple threads at the same time.
// EEXIST errors should be ignored in that case.
cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")).
Return(syscall.ENOENT)
cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")).
Return(syscall.ENOENT)
baseFileFetcher.EXPECT().GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false)
Expand All @@ -94,6 +98,8 @@ func TestHardlinkingFileFetcher(t *testing.T) {
fileFetcher.GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false))

// Errors other than EEXIST should be propagated as usual.
cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")).
Return(syscall.ENOENT)
cacheDirectory.EXPECT().Link(path.MustNewComponent("3-8b1a9953c4611296a827abf8c47804d7-5-x"), buildDirectory, path.MustNewComponent("hello.txt")).
Return(syscall.ENOENT)
baseFileFetcher.EXPECT().GetFile(ctx, blobDigest1, buildDirectory, path.MustNewComponent("hello.txt"), false)
Expand Down