Skip to content

Commit d290e21

Browse files
turan18Kern--
authored andcommitted
Deterministically close open span cache file descriptors
The snapshotter stores fetched spans in a cache either in memory or on disk. When reading from the cache on disk we use a Finalizer construct to close the open file descriptors when the Go garbage collector sees that the fd is no longer being referenced. The issue with this is that we don't have control over when the GC runs (although it's possible), and so the process could hold on too open fds for a unknown amount of time causing a sort of leak. On systems where the snapshotter is bounded by a ulimit in the number of open files, this can cause the snapshotter span cache get calls to fail, causing `file.Read` failures for the running container/process. This change wraps the readers returned by the cache in `io.ReadCloser`'s, so we can deterministically close the files once the content has been read from them. Signed-off-by: Yasin Turan <[email protected]>
1 parent 8c6880c commit d290e21

File tree

2 files changed

+37
-12
lines changed

2 files changed

+37
-12
lines changed

fs/reader/reader.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {
225225
if err != nil {
226226
return 0, fmt.Errorf("failed to read the file: %w", err)
227227
}
228+
defer r.Close()
228229

229230
// TODO this is not the right place for this metric to be. It needs to go down the BlobReader, when the HTTP request is issued
230231
commonmetrics.IncOperationCount(commonmetrics.SynchronousReadRegistryFetchCount, sf.gr.layerSha) // increment the number of on demand file fetches from remote registry
@@ -234,6 +235,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {
234235
if err != nil {
235236
return 0, fmt.Errorf("unexpected copied data size for on-demand fetch. read = %d, expected = %d", n, expectedSize)
236237
}
238+
237239
commonmetrics.AddBytesCount(commonmetrics.SynchronousBytesServed, sf.gr.layerSha, int64(n)) // measure the number of bytes served synchronously
238240

239241
return n, nil

fs/span-manager/span_manager.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,30 @@ var (
3838
ErrExceedMaxSpan = errors.New("span id larger than max span id")
3939
)
4040

41+
type MultiReaderCloser struct {
42+
c []io.Closer
43+
io.Reader
44+
}
45+
46+
func (mrc *MultiReaderCloser) Close() error {
47+
errs := []error{}
48+
for _, c := range mrc.c {
49+
if err := c.Close(); err != nil {
50+
errs = append(errs, err)
51+
}
52+
}
53+
return errors.Join(errs...)
54+
}
55+
56+
type SectionReaderCloser struct {
57+
c io.Closer
58+
*io.SectionReader
59+
}
60+
61+
func (src *SectionReaderCloser) Close() error {
62+
return src.c.Close()
63+
}
64+
4165
// SpanManager fetches and caches spans of a given layer.
4266
type SpanManager struct {
4367
cache cache.BlobCache
@@ -145,10 +169,11 @@ func (m *SpanManager) resolveSpan(spanID compression.SpanID) error {
145169

146170
// GetContents returns a reader for the requested contents. The contents may be
147171
// across multiple spans.
148-
func (m *SpanManager) GetContents(startUncompOffset, endUncompOffset compression.Offset) (io.Reader, error) {
172+
func (m *SpanManager) GetContents(startUncompOffset, endUncompOffset compression.Offset) (io.ReadCloser, error) {
149173
si := m.getSpanInfo(startUncompOffset, endUncompOffset)
150174
numSpans := si.spanEnd - si.spanStart + 1
151175
spanReaders := make([]io.Reader, numSpans)
176+
spanClosers := make([]io.Closer, numSpans)
152177

153178
eg, _ := errgroup.WithContext(context.Background())
154179
var i compression.SpanID
@@ -161,14 +186,14 @@ func (m *SpanManager) GetContents(startUncompOffset, endUncompOffset compression
161186
return err
162187
}
163188
spanReaders[j] = r
189+
spanClosers[j] = r
164190
return nil
165191
})
166192
}
167193
if err := eg.Wait(); err != nil {
168194
return nil, err
169195
}
170-
171-
return io.MultiReader(spanReaders...), nil
196+
return &MultiReaderCloser{spanClosers, io.MultiReader(spanReaders...)}, nil
172197
}
173198

174199
// getSpanInfo returns spanInfo from the offsets of the requested file
@@ -216,7 +241,7 @@ func (m *SpanManager) getSpanInfo(offsetStart, offsetEnd compression.Offset) *sp
216241
// 3. For `unrequested` span, fetch-uncompress-cache the span data, return the reader
217242
// from the uncompressed span
218243
// 4. No span state lock will be acquired in `requested` state.
219-
func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, offsetEnd compression.Offset) (io.Reader, error) {
244+
func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, offsetEnd compression.Offset) (io.ReadCloser, error) {
220245
s := m.spans[spanID]
221246
size := offsetEnd - offsetStart
222247

@@ -240,6 +265,7 @@ func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, off
240265
if err != nil {
241266
return nil, err
242267
}
268+
defer r.Close()
243269

244270
// read compressed span
245271
compressedBuf, err := io.ReadAll(r)
@@ -260,7 +286,7 @@ func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, off
260286
if err := s.setState(uncompressed); err != nil {
261287
return nil, err
262288
}
263-
return bytes.NewReader(uncompSpanBuf[offsetStart : offsetStart+size]), nil
289+
return io.NopCloser(bytes.NewReader(uncompSpanBuf[offsetStart : offsetStart+size])), nil
264290
}
265291

266292
// fetch-uncompress-cache span: span state can only be `unrequested` since
@@ -270,7 +296,7 @@ func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, off
270296
return nil, err
271297
}
272298
buf := bytes.NewBuffer(uncompBuf[offsetStart : offsetStart+size])
273-
return io.Reader(buf), nil
299+
return io.NopCloser(buf), nil
274300
}
275301

276302
// fetchAndCacheSpan fetches a span, uncompresses the span if `uncompress == true`,
@@ -393,18 +419,15 @@ func (m *SpanManager) addSpanToCache(spanID compression.SpanID, contents []byte,
393419
// getSpanFromCache returns the cached span content as an `io.Reader`.
394420
// `offset` is the offset of the requested contents within the span.
395421
// `size` is the size of the requested contents.
396-
func (m *SpanManager) getSpanFromCache(spanID compression.SpanID, offset, size compression.Offset) (io.Reader, error) {
422+
func (m *SpanManager) getSpanFromCache(spanID compression.SpanID, offset, size compression.Offset) (io.ReadCloser, error) {
397423
r, err := m.cache.Get(fmt.Sprintf("%d", spanID))
398424
if err != nil {
399425
return nil, fmt.Errorf("%w: %w", ErrSpanNotAvailable, err)
400426
}
401-
runtime.SetFinalizer(r, func(r cache.Reader) {
402-
r.Close()
403-
})
404-
return io.NewSectionReader(r, int64(offset), int64(size)), nil
427+
return &SectionReaderCloser{r, io.NewSectionReader(r, int64(offset), int64(size))}, nil
405428
}
406429

407-
// verifySpanContents caculates span digest from its compressed bytes, and compare
430+
// verifySpanContents calculates span digest from its compressed bytes, and compare
408431
// with the digest stored in ztoc.
409432
func (m *SpanManager) verifySpanContents(compressedData []byte, spanID compression.SpanID) error {
410433
actual := digest.FromBytes(compressedData)

0 commit comments

Comments
 (0)