Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 109 additions & 47 deletions pkg/cas/hardlinking_file_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ type hardlinkingFileFetcher struct {

evictionLock sync.Mutex
evictionSet eviction.Set[string]

downloadsLock sync.Mutex
downloads map[string]*download
}

type download struct {
wait chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is possible to be more strict here with wait <-chan struct{} and further down use

wait := make(chan struct{})
d = &download{wait: wait}
close(wait)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion.

err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err error
// err is available only when wait has been closed.
err error

}

// NewHardlinkingFileFetcher is an adapter for FileFetcher that stores
Expand All @@ -43,6 +51,8 @@ func NewHardlinkingFileFetcher(base FileFetcher, cacheDirectory filesystem.Direc
filesSize: map[string]int64{},

evictionSet: evictionSet,

downloads: map[string]*download{},
}
}

Expand Down Expand Up @@ -71,59 +81,111 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest
}

// If the file is present in the cache, hardlink it to the destination.
wasMissing := false
ff.filesLock.RLock()
if _, ok := ff.filesSize[key]; ok {
ff.evictionLock.Lock()
ff.evictionSet.Touch(key)
ff.evictionLock.Unlock()

if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil {
// Successfully hardlinked the file to its destination.
ff.filesLock.RUnlock()
return nil
} else if !os.IsNotExist(err) {
ff.filesLock.RUnlock()
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key)
}
wasMissing, err := ff.tryLinkFromCache(key, directory, name)
if err == nil {
return nil
} else if !wasMissing {
return err
}

// The file was part of the cache, even though it did not
// exist on disk. Some other process may have tampered
// with the cache directory's contents.
wasMissing = true
// A download is required. Let's see if one is already in progress.
ff.downloadsLock.Lock()
d, ok := ff.downloads[key]
if ok {
// A download is already in progress. Wait for it to finish.
ff.downloadsLock.Unlock()
select {
case <-d.wait:
if d.err != nil {
return d.err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect. A caller of GetFile() may receive an error that was produced by another invocation. This should not happen, as errors may also include things like context cancelation, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to retry if it failed because of context cancelation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: bb_storage/pkg/auth/remoteAuthorizer.authorizeSingle also retries but has the same kind of flaw. Consider the case where the timeout is 55s, downloading the file takes 50s, one call is coming in every 30s and that the first call is cancelled after 35s. Then all retries for the later requests will also fail because when the retry starts, the context has less than 50s left.

0s RPC 0
30s RPC 1
45s RPC 0 cancelled, RPC 1 retries
60s RPC 2
85s RPC 1 times out (5s left to download), RPC 2 retries
90s RPC 3
115 RPC 2 times out (15s left to download), RPC 3 retries
120s RPC 4
115 RPC 3 times out (15s left to download), RPC 4 retries
...

One solution is to manually cancel a context.Background() when there are no requests left waiting. I don't know how complex the implementation and tests for this kind of solution will be or if this is just not worth bothering with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just eliminate the error propagation logic entirely? Just change that downloads map to:

downloads map[string]<-chan struct{}

Only use that to wait an existing download to complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Updated to use the new map.

}
// The download should have placed the file in the cache.
// Try linking from the cache one more time.
wasMissingAfterWait, errAfterWait := ff.tryLinkFromCache(key, directory, name)
if errAfterWait == nil {
return nil
} else if wasMissingAfterWait {
return util.StatusWrapfWithCode(errAfterWait, codes.Internal, "Failed to link from cache for %#v after download", key)
}
return errAfterWait
case <-ctx.Done():
return util.StatusFromContext(ctx)
}
}
ff.filesLock.RUnlock()

// Download the file at the intended location.
if err := ff.base.GetFile(ctx, blobDigest, directory, name, isExecutable); err != nil {
return err
// Start a new download.
d = &download{wait: make(chan struct{})}
ff.downloads[key] = d
ff.downloadsLock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ff.downloadsLock does not necessarily need to be locked when setting d.err as it is only to be read after d.wait has been closed. This means that defer can be used.

Suggested change
d = &download{wait: make(chan struct{})}
ff.downloads[key] = d
ff.downloadsLock.Unlock()
wait := make(chan struct{})
d = &download{wait: wait}
ff.downloads[key] = d
ff.downloadsLock.Unlock()
defer func() {
ff.downloadsLock.Lock()
delete(ff.downloads, key)
ff.downloadsLock.Unlock()
close(wait)
}()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Updated to use defer. The code is cleaner.


downloadErr := ff.base.GetFile(ctx, blobDigest, directory, name, isExecutable)
if downloadErr == nil {
// The file was downloaded successfully. Place it into the
// cache, so that successive calls may use it.
ff.filesLock.Lock()
if _, ok := ff.filesSize[key]; !ok {
ff.evictionLock.Lock()

// Remove old files from the cache if necessary.
sizeBytes := blobDigest.GetSizeBytes()
if err := ff.makeSpace(sizeBytes); err != nil {
downloadErr = err
} else {
// Hardlink the file into the cache.
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
downloadErr = util.StatusWrapfWithCode(err, codes.Internal, "Failed to add cached file %#v", key)
} else {
ff.evictionSet.Insert(key)
ff.filesSize[key] = sizeBytes
ff.filesTotalSize += sizeBytes
}
}
ff.evictionLock.Unlock()
} else if wasMissing {
// The file was already part of our bookkeeping,
// but was missing on disk. Repair this by adding
// a link to the newly downloaded file.
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
downloadErr = util.StatusWrapfWithCode(err, codes.Internal, "Failed to repair cached file %#v", key)
}
}
ff.filesLock.Unlock()
}

ff.filesLock.Lock()
defer ff.filesLock.Unlock()
if _, ok := ff.filesSize[key]; !ok {
ff.evictionLock.Lock()
defer ff.evictionLock.Unlock()
// Unblock waiters.
ff.downloadsLock.Lock()
d.err = downloadErr
delete(ff.downloads, key)
ff.downloadsLock.Unlock()
close(d.wait)

// Remove old files from the cache if necessary.
sizeBytes := blobDigest.GetSizeBytes()
if err := ff.makeSpace(sizeBytes); err != nil {
return err
}
return downloadErr
}

// Hardlink the file into the cache.
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to add cached file %#v", key)
}
ff.evictionSet.Insert(key)
ff.filesSize[key] = sizeBytes
ff.filesTotalSize += sizeBytes
} else if wasMissing {
// Even though the file is part of our bookkeeping, we
// observed it didn't exist. Repair this inconsistency.
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to repair cached file %#v", key)
}
// tryLinkFromCache attempts to create a hardlink from the cache to a
// file in the build directory. The first return value is whether the
// file was present in the cache's bookkeeping, but missing on disk.
func (ff *hardlinkingFileFetcher) tryLinkFromCache(key string, directory filesystem.Directory, name path.Component) (bool, error) {
ff.filesLock.RLock()
_, ok := ff.filesSize[key]
ff.filesLock.RUnlock()
if !ok {
return true, os.ErrNotExist
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this wasMissingreturn value really needed? Wouldn't it be sufficient to just call os.IsNotExist(err) at the call site?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Changed to just call os.IsNotExist(err)

}
return nil

ff.evictionLock.Lock()
ff.evictionSet.Touch(key)
ff.evictionLock.Unlock()

if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil {
// Successfully hardlinked the file to its destination.
return false, nil
} else if !os.IsNotExist(err) {
return false, util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key)
}

// The file was part of the cache, even though it did not
// exist on disk. Some other process may have tampered with
// the cache directory's contents.
return true, os.ErrNotExist
}