Skip to content

Commit 44ac449

Browse files
committed
native build directories hardlinking file fetcher de-duplicates in-flight download requests
1 parent babfe8f commit 44ac449

File tree

1 file changed

+109
-47
lines changed

1 file changed

+109
-47
lines changed

pkg/cas/hardlinking_file_fetcher.go

Lines changed: 109 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ type hardlinkingFileFetcher struct {
2626

2727
evictionLock sync.Mutex
2828
evictionSet eviction.Set[string]
29+
30+
downloadsLock sync.Mutex
31+
downloads map[string]*download
32+
}
33+
34+
type download struct {
35+
wait chan struct{}
36+
err error
2937
}
3038

3139
// NewHardlinkingFileFetcher is an adapter for FileFetcher that stores
@@ -43,6 +51,8 @@ func NewHardlinkingFileFetcher(base FileFetcher, cacheDirectory filesystem.Direc
4351
filesSize: map[string]int64{},
4452

4553
evictionSet: evictionSet,
54+
55+
downloads: map[string]*download{},
4656
}
4757
}
4858

@@ -71,59 +81,111 @@ func (ff *hardlinkingFileFetcher) GetFile(ctx context.Context, blobDigest digest
7181
}
7282

7383
// If the file is present in the cache, hardlink it to the destination.
74-
wasMissing := false
75-
ff.filesLock.RLock()
76-
if _, ok := ff.filesSize[key]; ok {
77-
ff.evictionLock.Lock()
78-
ff.evictionSet.Touch(key)
79-
ff.evictionLock.Unlock()
80-
81-
if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil {
82-
// Successfully hardlinked the file to its destination.
83-
ff.filesLock.RUnlock()
84-
return nil
85-
} else if !os.IsNotExist(err) {
86-
ff.filesLock.RUnlock()
87-
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key)
88-
}
84+
wasMissing, err := ff.tryLinkFromCache(key, directory, name)
85+
if err == nil {
86+
return nil
87+
} else if !wasMissing {
88+
return err
89+
}
8990

90-
// The file was part of the cache, even though it did not
91-
// exist on disk. Some other process may have tampered
92-
// with the cache directory's contents.
93-
wasMissing = true
91+
// A download is required. Let's see if one is already in progress.
92+
ff.downloadsLock.Lock()
93+
d, ok := ff.downloads[key]
94+
if ok {
95+
// A download is already in progress. Wait for it to finish.
96+
ff.downloadsLock.Unlock()
97+
select {
98+
case <-d.wait:
99+
if d.err != nil {
100+
return d.err
101+
}
102+
// The download should have placed the file in the cache.
103+
// Try linking from the cache one more time.
104+
wasMissingAfterWait, errAfterWait := ff.tryLinkFromCache(key, directory, name)
105+
if errAfterWait == nil {
106+
return nil
107+
} else if wasMissingAfterWait {
108+
return util.StatusWrapfWithCode(errAfterWait, codes.Internal, "Failed to link from cache for %#v after download", key)
109+
}
110+
return errAfterWait
111+
case <-ctx.Done():
112+
return util.StatusFromContext(ctx)
113+
}
94114
}
95-
ff.filesLock.RUnlock()
96115

97-
// Download the file at the intended location.
98-
if err := ff.base.GetFile(ctx, blobDigest, directory, name, isExecutable); err != nil {
99-
return err
116+
// Start a new download.
117+
d = &download{wait: make(chan struct{})}
118+
ff.downloads[key] = d
119+
ff.downloadsLock.Unlock()
120+
121+
downloadErr := ff.base.GetFile(ctx, blobDigest, directory, name, isExecutable)
122+
if downloadErr == nil {
123+
// The file was downloaded successfully. Place it into the
124+
// cache, so that successive calls may use it.
125+
ff.filesLock.Lock()
126+
if _, ok := ff.filesSize[key]; !ok {
127+
ff.evictionLock.Lock()
128+
129+
// Remove old files from the cache if necessary.
130+
sizeBytes := blobDigest.GetSizeBytes()
131+
if err := ff.makeSpace(sizeBytes); err != nil {
132+
downloadErr = err
133+
} else {
134+
// Hardlink the file into the cache.
135+
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
136+
downloadErr = util.StatusWrapfWithCode(err, codes.Internal, "Failed to add cached file %#v", key)
137+
} else {
138+
ff.evictionSet.Insert(key)
139+
ff.filesSize[key] = sizeBytes
140+
ff.filesTotalSize += sizeBytes
141+
}
142+
}
143+
ff.evictionLock.Unlock()
144+
} else if wasMissing {
145+
// The file was already part of our bookkeeping,
146+
// but was missing on disk. Repair this by adding
147+
// a link to the newly downloaded file.
148+
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
149+
downloadErr = util.StatusWrapfWithCode(err, codes.Internal, "Failed to repair cached file %#v", key)
150+
}
151+
}
152+
ff.filesLock.Unlock()
100153
}
101154

102-
ff.filesLock.Lock()
103-
defer ff.filesLock.Unlock()
104-
if _, ok := ff.filesSize[key]; !ok {
105-
ff.evictionLock.Lock()
106-
defer ff.evictionLock.Unlock()
155+
// Unblock waiters.
156+
ff.downloadsLock.Lock()
157+
d.err = downloadErr
158+
delete(ff.downloads, key)
159+
ff.downloadsLock.Unlock()
160+
close(d.wait)
107161

108-
// Remove old files from the cache if necessary.
109-
sizeBytes := blobDigest.GetSizeBytes()
110-
if err := ff.makeSpace(sizeBytes); err != nil {
111-
return err
112-
}
162+
return downloadErr
163+
}
113164

114-
// Hardlink the file into the cache.
115-
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
116-
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to add cached file %#v", key)
117-
}
118-
ff.evictionSet.Insert(key)
119-
ff.filesSize[key] = sizeBytes
120-
ff.filesTotalSize += sizeBytes
121-
} else if wasMissing {
122-
// Even though the file is part of our bookkeeping, we
123-
// observed it didn't exist. Repair this inconsistency.
124-
if err := directory.Link(name, ff.cacheDirectory, path.MustNewComponent(key)); err != nil && !os.IsExist(err) {
125-
return util.StatusWrapfWithCode(err, codes.Internal, "Failed to repair cached file %#v", key)
126-
}
165+
// tryLinkFromCache attempts to create a hardlink from the cache to a
166+
// file in the build directory. The first return value is whether the
167+
// file was present in the cache's bookkeeping, but missing on disk.
168+
func (ff *hardlinkingFileFetcher) tryLinkFromCache(key string, directory filesystem.Directory, name path.Component) (bool, error) {
169+
ff.filesLock.RLock()
170+
_, ok := ff.filesSize[key]
171+
ff.filesLock.RUnlock()
172+
if !ok {
173+
return true, os.ErrNotExist
127174
}
128-
return nil
175+
176+
ff.evictionLock.Lock()
177+
ff.evictionSet.Touch(key)
178+
ff.evictionLock.Unlock()
179+
180+
if err := ff.cacheDirectory.Link(path.MustNewComponent(key), directory, name); err == nil {
181+
// Successfully hardlinked the file to its destination.
182+
return false, nil
183+
} else if !os.IsNotExist(err) {
184+
return false, util.StatusWrapfWithCode(err, codes.Internal, "Failed to create hardlink to cached file %#v", key)
185+
}
186+
187+
// The file was part of the cache, even though it did not
188+
// exist on disk. Some other process may have tampered with
189+
// the cache directory's contents.
190+
return true, os.ErrNotExist
129191
}

0 commit comments

Comments
 (0)