Skip to content

Commit fd80bdd

Browse files
committed
Implement resumable downloads
So it a download gets interrupted, we do not have to start again from scratch Signed-off-by: Eric Curtin <eric.curtin@docker.com>
1 parent 6d1c75e commit fd80bdd

File tree

5 files changed

+265
-11
lines changed

5 files changed

+265
-11
lines changed

pkg/distribution/distribution/client.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/docker/model-runner/pkg/distribution/registry"
1818
"github.com/docker/model-runner/pkg/distribution/tarball"
1919
"github.com/docker/model-runner/pkg/distribution/types"
20+
"github.com/docker/model-runner/pkg/go-containerregistry/pkg/v1/remote"
2021
"github.com/docker/model-runner/pkg/inference/platform"
2122
)
2223

@@ -140,11 +141,57 @@ func NewClient(opts ...Option) (*Client, error) {
140141
func (c *Client) PullModel(ctx context.Context, reference string, progressWriter io.Writer) error {
141142
c.log.Infoln("Starting model pull:", utils.SanitizeForLog(reference))
142143

144+
// First, fetch the remote model to get the manifest
143145
remoteModel, err := c.registry.Model(ctx, reference)
144146
if err != nil {
145147
return fmt.Errorf("reading model from registry: %w", err)
146148
}
147149

150+
// Check for incomplete downloads and prepare resume offsets
151+
layers, err := remoteModel.Layers()
152+
if err != nil {
153+
return fmt.Errorf("getting layers: %w", err)
154+
}
155+
156+
// Build a map of digest -> resume offset for layers with incomplete downloads
157+
resumeOffsets := make(map[string]int64)
158+
for _, layer := range layers {
159+
digest, err := layer.Digest()
160+
if err != nil {
161+
c.log.Warnf("Failed to get layer digest: %v", err)
162+
continue
163+
}
164+
165+
// Check if there's an incomplete download for this layer (use DiffID for uncompressed models)
166+
diffID, err := layer.DiffID()
167+
if err != nil {
168+
c.log.Warnf("Failed to get layer diffID: %v", err)
169+
continue
170+
}
171+
172+
incompleteSize, err := c.store.GetIncompleteSize(diffID)
173+
if err != nil {
174+
c.log.Warnf("Failed to check incomplete size for layer %s: %v", digest, err)
175+
continue
176+
}
177+
178+
if incompleteSize > 0 {
179+
c.log.Infof("Found incomplete download for layer %s: %d bytes", digest, incompleteSize)
180+
resumeOffsets[digest.String()] = incompleteSize
181+
}
182+
}
183+
184+
// If we have any incomplete downloads, create a new context with resume offsets
185+
if len(resumeOffsets) > 0 {
186+
c.log.Infof("Resuming %d interrupted layer download(s)", len(resumeOffsets))
187+
ctx = remote.WithResumeOffsets(ctx, resumeOffsets)
188+
// Re-fetch the model with the updated context
189+
remoteModel, err = c.registry.Model(ctx, reference)
190+
if err != nil {
191+
return fmt.Errorf("reading model from registry with resume context: %w", err)
192+
}
193+
}
194+
148195
// Check for supported type
149196
if err := checkCompat(remoteModel, c.log, reference); err != nil {
150197
return err

pkg/distribution/internal/progress/reader.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ func NewReader(r io.Reader, updates chan<- v1.Update) io.Reader {
2424
}
2525
}
2626

27+
// NewReaderWithOffset returns a reader that reports progress starting from an initial offset.
28+
// This is useful for resuming interrupted downloads.
29+
func NewReaderWithOffset(r io.Reader, updates chan<- v1.Update, initialOffset int64) io.Reader {
30+
if updates == nil {
31+
return r
32+
}
33+
return &Reader{
34+
Reader: r,
35+
ProgressChan: updates,
36+
Total: initialOffset,
37+
}
38+
}
39+
2740
func (pr *Reader) Read(p []byte) (int, error) {
2841
n, err := pr.Reader.Read(p)
2942
pr.Total += int64(n)

pkg/distribution/internal/store/blobs.go

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,27 @@ type blob interface {
7878
Uncompressed() (io.ReadCloser, error)
7979
}
8080

81+
// layerWithDigest extends blob to include the Digest method
82+
type layerWithDigest interface {
83+
blob
84+
Digest() (v1.Hash, error)
85+
}
86+
87+
// resumableLayer wraps a layer to add resume support
88+
type resumableLayer struct {
89+
v1.Layer
90+
store *LocalStore
91+
}
92+
93+
func (rl *resumableLayer) Uncompressed() (io.ReadCloser, error) {
94+
// For resumable downloads, we need to check for incomplete downloads
95+
// and wrap the layer to inject the resume offset into the context.
96+
// However, since the HTTP request happens inside the layer's Compressed() call,
97+
// we can't easily intercept it here without modifying the layer interface.
98+
// For now, we'll rely on the WriteBlob append logic and fetcher Range support.
99+
return rl.Layer.Uncompressed()
100+
}
101+
81102
// writeLayer writes the layer blob to the store.
82103
// It returns true when a new blob was created and the blob's DiffID.
83104
func (s *LocalStore) writeLayer(layer blob, updates chan<- v1.Update) (bool, v1.Hash, error) {
@@ -94,13 +115,28 @@ func (s *LocalStore) writeLayer(layer blob, updates chan<- v1.Update) (bool, v1.
94115
return false, hash, nil
95116
}
96117

118+
// Check if we're resuming an incomplete download
119+
incompleteSize, err := s.GetIncompleteSize(hash)
120+
if err != nil {
121+
return false, v1.Hash{}, fmt.Errorf("check incomplete size: %w", err)
122+
}
123+
97124
lr, err := layer.Uncompressed()
98125
if err != nil {
99126
return false, v1.Hash{}, fmt.Errorf("get blob contents: %w", err)
100127
}
101128
defer lr.Close()
102-
r := progress.NewReader(lr, updates)
103129

130+
// Wrap the reader with progress reporting, accounting for already downloaded bytes
131+
var r io.Reader
132+
if incompleteSize > 0 {
133+
r = progress.NewReaderWithOffset(lr, updates, incompleteSize)
134+
} else {
135+
r = progress.NewReader(lr, updates)
136+
}
137+
138+
// WriteBlob will handle appending to incomplete files
139+
// The HTTP layer will handle resuming via Range headers
104140
if err := s.WriteBlob(hash, r); err != nil {
105141
return false, hash, err
106142
}
@@ -109,6 +145,7 @@ func (s *LocalStore) writeLayer(layer blob, updates chan<- v1.Update) (bool, v1.
109145

110146
// WriteBlob writes the blob to the store, reporting progress to the given channel.
111147
// If the blob is already in the store, it is a no-op and the blob is not consumed from the reader.
148+
// If an incomplete download exists, it will be resumed by appending to the existing file.
112149
func (s *LocalStore) WriteBlob(diffID v1.Hash, r io.Reader) error {
113150
hasBlob, err := s.hasBlob(diffID)
114151
if err != nil {
@@ -122,21 +159,61 @@ func (s *LocalStore) WriteBlob(diffID v1.Hash, r io.Reader) error {
122159
if err != nil {
123160
return fmt.Errorf("get blob path: %w", err)
124161
}
125-
f, err := createFile(incompletePath(path))
126-
if err != nil {
127-
return fmt.Errorf("create blob file: %w", err)
162+
163+
incompletePath := incompletePath(path)
164+
165+
// Check if we're resuming a partial download
166+
var f *os.File
167+
var isResume bool
168+
if _, err := os.Stat(incompletePath); err == nil {
169+
// Resume: open file in append mode
170+
isResume = true
171+
f, err = os.OpenFile(incompletePath, os.O_WRONLY|os.O_APPEND, 0666)
172+
if err != nil {
173+
return fmt.Errorf("open incomplete blob file for resume: %w", err)
174+
}
175+
} else {
176+
// New download: create file
177+
f, err = createFile(incompletePath)
178+
if err != nil {
179+
return fmt.Errorf("create blob file: %w", err)
180+
}
128181
}
129-
defer os.Remove(incompletePath(path))
130182
defer f.Close()
131183

132184
if _, err := io.Copy(f, r); err != nil {
185+
// Don't delete the incomplete file on error - we want to resume later
133186
return fmt.Errorf("copy blob %q to store: %w", diffID.String(), err)
134187
}
135188

136189
f.Close() // Rename will fail on Windows if the file is still open.
137-
if err := os.Rename(incompletePath(path), path); err != nil {
190+
191+
// For resumed downloads, verify the complete file's hash before finalizing
192+
// (For new downloads, the stream was already verified during download)
193+
if isResume {
194+
completeFile, err := os.Open(incompletePath)
195+
if err != nil {
196+
return fmt.Errorf("open completed file for verification: %w", err)
197+
}
198+
defer completeFile.Close()
199+
200+
computedHash, _, err := v1.SHA256(completeFile)
201+
if err != nil {
202+
return fmt.Errorf("compute hash of completed file: %w", err)
203+
}
204+
205+
if computedHash.String() != diffID.String() {
206+
return fmt.Errorf("hash mismatch after download: got %s, want %s", computedHash, diffID)
207+
}
208+
}
209+
210+
if err := os.Rename(incompletePath, path); err != nil {
138211
return fmt.Errorf("rename blob file: %w", err)
139212
}
213+
214+
// Only remove incomplete file if rename succeeded (though rename should have moved it)
215+
// This is a safety cleanup in case rename didn't remove the source
216+
os.Remove(incompletePath)
140217
return nil
141218
}
142219

@@ -160,6 +237,25 @@ func (s *LocalStore) hasBlob(hash v1.Hash) (bool, error) {
160237
return false, nil
161238
}
162239

240+
// GetIncompleteSize returns the size of an incomplete blob if it exists, or 0 if it doesn't.
241+
func (s *LocalStore) GetIncompleteSize(hash v1.Hash) (int64, error) {
242+
path, err := s.blobPath(hash)
243+
if err != nil {
244+
return 0, fmt.Errorf("get blob path: %w", err)
245+
}
246+
247+
incompletePath := incompletePath(path)
248+
stat, err := os.Stat(incompletePath)
249+
if err != nil {
250+
if os.IsNotExist(err) {
251+
return 0, nil
252+
}
253+
return 0, fmt.Errorf("stat incomplete file: %w", err)
254+
}
255+
256+
return stat.Size(), nil
257+
}
258+
163259
// createFile is a wrapper around os.Create that creates any parent directories as needed.
164260
func createFile(path string) (*os.File, error) {
165261
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {

pkg/go-containerregistry/pkg/v1/remote/fetcher.go

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,30 +245,96 @@ func (f *fetcher) headManifest(ctx context.Context, ref name.Reference, acceptab
245245
}, nil
246246
}
247247

248+
// contextKey is a type for context keys used in this package
249+
type contextKey string
250+
251+
const resumeOffsetKey contextKey = "resumeOffset"
252+
const resumeOffsetsKey contextKey = "resumeOffsets"
253+
254+
// WithResumeOffset returns a context with the resume offset set for a single blob
255+
func WithResumeOffset(ctx context.Context, offset int64) context.Context {
256+
return context.WithValue(ctx, resumeOffsetKey, offset)
257+
}
258+
259+
// WithResumeOffsets returns a context with resume offsets for multiple blobs (keyed by digest)
260+
func WithResumeOffsets(ctx context.Context, offsets map[string]int64) context.Context {
261+
return context.WithValue(ctx, resumeOffsetsKey, offsets)
262+
}
263+
264+
// getResumeOffset retrieves the resume offset from context for a given digest
265+
func getResumeOffset(ctx context.Context, digest string) int64 {
266+
// First check if there's a specific offset for this digest
267+
if offsets, ok := ctx.Value(resumeOffsetsKey).(map[string]int64); ok {
268+
if offset, found := offsets[digest]; found && offset > 0 {
269+
return offset
270+
}
271+
}
272+
// Fall back to single offset (for fetchBlob)
273+
if offset, ok := ctx.Value(resumeOffsetKey).(int64); ok {
274+
return offset
275+
}
276+
return 0
277+
}
278+
248279
func (f *fetcher) fetchBlob(ctx context.Context, size int64, h v1.Hash) (io.ReadCloser, error) {
249280
u := f.url("blobs", h.String())
250281
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
251282
if err != nil {
252283
return nil, err
253284
}
254285

286+
// Check if we should resume from a specific offset
287+
resumeOffset := getResumeOffset(ctx, h.String())
288+
if resumeOffset > 0 {
289+
// Add Range header to resume download
290+
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", resumeOffset))
291+
}
292+
255293
resp, err := f.client.Do(req.WithContext(ctx))
256294
if err != nil {
257295
return nil, redact.Error(err)
258296
}
259297

260-
if err := transport.CheckError(resp, http.StatusOK); err != nil {
298+
// Accept both 200 OK (full content) and 206 Partial Content (resumed)
299+
if resumeOffset > 0 {
300+
// If we requested a Range but got 200, the server doesn't support ranges
301+
// We'll have to download from scratch
302+
if resp.StatusCode == http.StatusOK {
303+
// Server doesn't support range requests, will download full content
304+
resumeOffset = 0
305+
}
306+
}
307+
308+
if err := transport.CheckError(resp, http.StatusOK, http.StatusPartialContent); err != nil {
261309
resp.Body.Close()
262310
return nil, err
263311
}
264312

265-
// Do whatever we can.
266-
// If we have an expected size and Content-Length doesn't match, return an error.
267-
// If we don't have an expected size and we do have a Content-Length, use Content-Length.
313+
// For partial content (resumed downloads), we can't verify the hash on the stream
314+
// since we're only getting part of the file. The complete file will be verified
315+
// after all bytes are written to disk.
316+
if resumeOffset > 0 && resp.StatusCode == http.StatusPartialContent {
317+
// Verify Content-Length matches expected remaining size
318+
if hsize := resp.ContentLength; hsize != -1 {
319+
if size != verify.SizeUnknown {
320+
expectedRemaining := size - resumeOffset
321+
if hsize != expectedRemaining {
322+
resp.Body.Close()
323+
return nil, fmt.Errorf("GET %s: Content-Length header %d does not match expected remaining size %d", u.String(), hsize, expectedRemaining)
324+
}
325+
}
326+
}
327+
// Return the body without verification - we'll verify the complete file later
328+
return io.NopCloser(resp.Body), nil
329+
}
330+
331+
// For full downloads, verify the stream
332+
// Do whatever we can with size validation
268333
if hsize := resp.ContentLength; hsize != -1 {
269334
if size == verify.SizeUnknown {
270335
size = hsize
271336
} else if hsize != size {
337+
resp.Body.Close()
272338
return nil, fmt.Errorf("GET %s: Content-Length header %d does not match expected size %d", u.String(), hsize, size)
273339
}
274340
}

0 commit comments

Comments
 (0)