66 "os"
77 "path/filepath"
88 "strings"
9+ "time"
910
1011 "github.com/docker/model-runner/pkg/distribution/internal/progress"
1112
@@ -78,6 +79,12 @@ type blob interface {
7879 Uncompressed () (io.ReadCloser , error )
7980}
8081
82+ // layerWithDigest extends blob to include the Digest method
83+ type layerWithDigest interface {
84+ blob
85+ Digest () (v1.Hash , error )
86+ }
87+
8188// writeLayer writes the layer blob to the store.
8289// It returns true when a new blob was created and the blob's DiffID.
8390func (s * LocalStore ) writeLayer (layer blob , updates chan <- v1.Update ) (bool , v1.Hash , error ) {
@@ -94,13 +101,28 @@ func (s *LocalStore) writeLayer(layer blob, updates chan<- v1.Update) (bool, v1.
94101 return false , hash , nil
95102 }
96103
104+ // Check if we're resuming an incomplete download
105+ incompleteSize , err := s .GetIncompleteSize (hash )
106+ if err != nil {
107+ return false , v1.Hash {}, fmt .Errorf ("check incomplete size: %w" , err )
108+ }
109+
97110 lr , err := layer .Uncompressed ()
98111 if err != nil {
99112 return false , v1.Hash {}, fmt .Errorf ("get blob contents: %w" , err )
100113 }
101114 defer lr .Close ()
102- r := progress .NewReader (lr , updates )
103115
116+ // Wrap the reader with progress reporting, accounting for already downloaded bytes
117+ var r io.Reader
118+ if incompleteSize > 0 {
119+ r = progress .NewReaderWithOffset (lr , updates , incompleteSize )
120+ } else {
121+ r = progress .NewReader (lr , updates )
122+ }
123+
124+ // WriteBlob will handle appending to incomplete files
125+ // The HTTP layer will handle resuming via Range headers
104126 if err := s .WriteBlob (hash , r ); err != nil {
105127 return false , hash , err
106128 }
@@ -109,6 +131,7 @@ func (s *LocalStore) writeLayer(layer blob, updates chan<- v1.Update) (bool, v1.
109131
110132// WriteBlob writes the blob to the store, reporting progress to the given channel.
111133// If the blob is already in the store, it is a no-op and the blob is not consumed from the reader.
134+ // If an incomplete download exists, it will be resumed by appending to the existing file.
112135func (s * LocalStore ) WriteBlob (diffID v1.Hash , r io.Reader ) error {
113136 hasBlob , err := s .hasBlob (diffID )
114137 if err != nil {
@@ -122,21 +145,63 @@ func (s *LocalStore) WriteBlob(diffID v1.Hash, r io.Reader) error {
122145 if err != nil {
123146 return fmt .Errorf ("get blob path: %w" , err )
124147 }
125- f , err := createFile (incompletePath (path ))
126- if err != nil {
127- return fmt .Errorf ("create blob file: %w" , err )
148+
149+ incompletePath := incompletePath (path )
150+
151+ // Check if we're resuming a partial download
152+ var f * os.File
153+ var isResume bool
154+ if _ , err := os .Stat (incompletePath ); err == nil {
155+ // Resume: open file in append mode
156+ isResume = true
157+ f , err = os .OpenFile (incompletePath , os .O_WRONLY | os .O_APPEND , 0644 )
158+ if err != nil {
159+ return fmt .Errorf ("open incomplete blob file for resume: %w" , err )
160+ }
161+ } else {
162+ // New download: create file
163+ f , err = createFile (incompletePath )
164+ if err != nil {
165+ return fmt .Errorf ("create blob file: %w" , err )
166+ }
128167 }
129- defer os .Remove (incompletePath (path ))
130168 defer f .Close ()
131169
132170 if _ , err := io .Copy (f , r ); err != nil {
171+ // Don't delete the incomplete file on error - we want to resume later
133172 return fmt .Errorf ("copy blob %q to store: %w" , diffID .String (), err )
134173 }
135174
136175 f .Close () // Rename will fail on Windows if the file is still open.
137- if err := os .Rename (incompletePath (path ), path ); err != nil {
176+
177+ // For resumed downloads, verify the complete file's hash before finalizing
178+ // (For new downloads, the stream was already verified during download)
179+ if isResume {
180+ completeFile , err := os .Open (incompletePath )
181+ if err != nil {
182+ return fmt .Errorf ("open completed file for verification: %w" , err )
183+ }
184+ defer completeFile .Close ()
185+
186+ computedHash , _ , err := v1 .SHA256 (completeFile )
187+ if err != nil {
188+ return fmt .Errorf ("compute hash of completed file: %w" , err )
189+ }
190+
191+ if computedHash .String () != diffID .String () {
192+ // The resumed download is corrupt, remove it so we can start fresh next time
193+ _ = os .Remove (incompletePath )
194+ return fmt .Errorf ("hash mismatch after download: got %s, want %s" , computedHash , diffID )
195+ }
196+ }
197+
198+ if err := os .Rename (incompletePath , path ); err != nil {
138199 return fmt .Errorf ("rename blob file: %w" , err )
139200 }
201+
202+ // Only remove incomplete file if rename succeeded (though rename should have moved it)
203+ // This is a safety cleanup in case rename didn't remove the source
204+ os .Remove (incompletePath )
140205 return nil
141206}
142207
@@ -160,6 +225,25 @@ func (s *LocalStore) hasBlob(hash v1.Hash) (bool, error) {
160225 return false , nil
161226}
162227
228+ // GetIncompleteSize returns the size of an incomplete blob if it exists, or 0 if it doesn't.
229+ func (s * LocalStore ) GetIncompleteSize (hash v1.Hash ) (int64 , error ) {
230+ path , err := s .blobPath (hash )
231+ if err != nil {
232+ return 0 , fmt .Errorf ("get blob path: %w" , err )
233+ }
234+
235+ incompletePath := incompletePath (path )
236+ stat , err := os .Stat (incompletePath )
237+ if err != nil {
238+ if os .IsNotExist (err ) {
239+ return 0 , nil
240+ }
241+ return 0 , fmt .Errorf ("stat incomplete file: %w" , err )
242+ }
243+
244+ return stat .Size (), nil
245+ }
246+
163247// createFile is a wrapper around os.Create that creates any parent directories as needed.
164248func createFile (path string ) (* os.File , error ) {
165249 if err := os .MkdirAll (filepath .Dir (path ), 0777 ); err != nil {
@@ -201,3 +285,59 @@ func (s *LocalStore) writeConfigFile(mdl v1.Image) (bool, error) {
201285 }
202286 return true , nil
203287}
288+
289+ // CleanupStaleIncompleteFiles removes incomplete download files that haven't been modified
290+ // for more than the specified duration. This prevents disk space leaks from abandoned downloads.
291+ func (s * LocalStore ) CleanupStaleIncompleteFiles (maxAge time.Duration ) error {
292+ blobsPath := s .blobsDir ()
293+ if _ , err := os .Stat (blobsPath ); os .IsNotExist (err ) {
294+ // Blobs directory doesn't exist yet, nothing to clean up
295+ return nil
296+ }
297+
298+ var cleanedCount int
299+ var cleanupErrors []error
300+
301+ // Walk through the blobs directory looking for .incomplete files
302+ err := filepath .Walk (blobsPath , func (path string , info os.FileInfo , err error ) error {
303+ if err != nil {
304+ // Continue walking even if we encounter errors on individual files
305+ return nil
306+ }
307+
308+ // Skip directories
309+ if info .IsDir () {
310+ return nil
311+ }
312+
313+ // Only process .incomplete files
314+ if ! strings .HasSuffix (path , ".incomplete" ) {
315+ return nil
316+ }
317+
318+ // Check if file is older than maxAge
319+ if time .Since (info .ModTime ()) > maxAge {
320+ if removeErr := os .Remove (path ); removeErr != nil {
321+ cleanupErrors = append (cleanupErrors , fmt .Errorf ("failed to remove stale incomplete file %s: %w" , path , removeErr ))
322+ } else {
323+ cleanedCount ++
324+ }
325+ }
326+
327+ return nil
328+ })
329+
330+ if err != nil {
331+ return fmt .Errorf ("walking blobs directory: %w" , err )
332+ }
333+
334+ if len (cleanupErrors ) > 0 {
335+ return fmt .Errorf ("encountered %d errors during cleanup (cleaned %d files): %v" , len (cleanupErrors ), cleanedCount , cleanupErrors [0 ])
336+ }
337+
338+ if cleanedCount > 0 {
339+ fmt .Printf ("Cleaned up %d stale incomplete download file(s)\n " , cleanedCount )
340+ }
341+
342+ return nil
343+ }
0 commit comments