Skip to content

Commit 62f0fe9

Browse files
committed
download: dont cleanup directory after download failed
DownloadDir has the option to ignore paths that we have already on disk. In theory that makes downloads resumable, but in practice this doesnt work because we delete everything if one download fails. This means if one chunk in 1TiB block fails to download for whatever reason we start all over. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent a0136a6 commit 62f0fe9

File tree

2 files changed

+9
-65
lines changed

2 files changed

+9
-65
lines changed

objstore.go

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"path/filepath"
1515
"slices"
1616
"strings"
17-
"sync"
1817
"time"
1918

2019
"github.com/efficientgo/core/logerrcapture"
@@ -442,7 +441,11 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src,
442441
}
443442

444443
// DownloadDir downloads all object found in the directory into the local directory.
445-
func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error {
444+
func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string, options ...DownloadOption) error {
445+
return downloadDir(ctx, logger, bkt, src, src, dst, options...)
446+
}
447+
448+
func downloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error {
446449
if err := os.MkdirAll(dst, 0750); err != nil {
447450
return errors.Wrap(err, "create dir")
448451
}
@@ -453,19 +456,13 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
453456
g, ctx := errgroup.WithContext(ctx)
454457
g.SetLimit(opts.concurrency)
455458

456-
var downloadedFiles []string
457-
var m sync.Mutex
458-
459459
err := bkt.Iter(ctx, src, func(name string) error {
460460
g.Go(func() error {
461461
dst := filepath.Join(dst, filepath.Base(name))
462462
if strings.HasSuffix(name, DirDelim) {
463-
if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil {
463+
if err := downloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil {
464464
return err
465465
}
466-
m.Lock()
467-
downloadedFiles = append(downloadedFiles, dst)
468-
m.Unlock()
469466
return nil
470467
}
471468
for _, ignoredPath := range opts.ignoredPaths {
@@ -474,34 +471,14 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
474471
return nil
475472
}
476473
}
477-
if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil {
478-
return err
479-
}
480-
481-
m.Lock()
482-
downloadedFiles = append(downloadedFiles, dst)
483-
m.Unlock()
484-
return nil
474+
return DownloadFile(ctx, logger, bkt, name, dst)
485475
})
486476
return nil
487477
})
488-
489-
if err == nil {
490-
err = g.Wait()
491-
}
492-
493478
if err != nil {
494-
downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory.
495-
// Best-effort cleanup if the download failed.
496-
for _, f := range downloadedFiles {
497-
if rerr := os.RemoveAll(f); rerr != nil {
498-
level.Warn(logger).Log("msg", "failed to remove file on partial dir download error", "file", f, "err", rerr)
499-
}
500-
}
501479
return err
502480
}
503-
504-
return nil
481+
return g.Wait()
505482
}
506483

507484
// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment objstore_bucket_operation_failures_total metric.

objstore_test.go

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
305305
objstore_bucket_operations_total{bucket="",operation="upload"} 3
306306
`), `objstore_bucket_operations_total`))
307307

308-
testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10)))
308+
testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", tempDir, WithFetchConcurrency(10)))
309309
i, err := os.ReadDir(tempDir)
310310
testutil.Ok(t, err)
311311
testutil.Assert(t, len(i) == 3)
@@ -519,39 +519,6 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) {
519519
testutil.Assert(t, isReaderAt)
520520
}
521521

522-
func TestDownloadDir_CleanUp(t *testing.T) {
523-
b := unreliableBucket{
524-
Bucket: NewInMemBucket(),
525-
n: 3,
526-
current: atomic.NewInt32(0),
527-
}
528-
tempDir := t.TempDir()
529-
530-
testutil.Ok(t, b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
531-
testutil.Ok(t, b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
532-
testutil.Ok(t, b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))
533-
534-
// We exapect the third Get to fail
535-
testutil.NotOk(t, DownloadDir(context.Background(), log.NewNopLogger(), b, "dir/", "dir/", tempDir))
536-
_, err := os.Stat(tempDir)
537-
testutil.Assert(t, os.IsNotExist(err))
538-
}
539-
540-
// unreliableBucket implements Bucket and returns an error on every n-th Get.
541-
type unreliableBucket struct {
542-
Bucket
543-
544-
n int32
545-
current *atomic.Int32
546-
}
547-
548-
func (b unreliableBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
549-
if b.current.Inc()%b.n == 0 {
550-
return nil, errors.Errorf("some error message")
551-
}
552-
return b.Bucket.Get(ctx, name)
553-
}
554-
555522
// mockReader implements io.ReadCloser and allows to mock the functions.
556523
type mockReader struct {
557524
io.Reader

0 commit comments

Comments
 (0)