Skip to content

Commit 3bf609c

Browse files
committed
download: dont cleanup directory after download failed
DownloadDir has the option to ignore paths that we have already on disk. In theory that makes downloads resumable, but in practice this doesnt work because we delete everything if one download fails. This means if one chunk in 1TiB block fails to download for whatever reason we start all over. Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
1 parent a0136a6 commit 3bf609c

File tree

3 files changed

+10
-66
lines changed

3 files changed

+10
-66
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
7373
- [#89](https://github.com/thanos-io/objstore/pull/89) GCS: Upgrade cloud.google.com/go/storage version to `v1.35.1`.
7474
- [#123](https://github.com/thanos-io/objstore/pull/123) *: Upgrade minio-go version to `v7.0.72`.
7575
- [#132](https://github.com/thanos-io/objstore/pull/132) s3: Upgrade aws-sdk-go-v2/config version to `v1.27.30`
76+
- [#182](https://github.com/thanos-io/objstore/pull/182) download: dont cleanup directory after download failed
7677

7778
### Removed

objstore.go

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"path/filepath"
1515
"slices"
1616
"strings"
17-
"sync"
1817
"time"
1918

2019
"github.com/efficientgo/core/logerrcapture"
@@ -442,7 +441,11 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src,
442441
}
443442

444443
// DownloadDir downloads all object found in the directory into the local directory.
445-
func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error {
444+
func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string, options ...DownloadOption) error {
445+
return downloadDir(ctx, logger, bkt, src, src, dst, options...)
446+
}
447+
448+
func downloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error {
446449
if err := os.MkdirAll(dst, 0750); err != nil {
447450
return errors.Wrap(err, "create dir")
448451
}
@@ -453,19 +456,13 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
453456
g, ctx := errgroup.WithContext(ctx)
454457
g.SetLimit(opts.concurrency)
455458

456-
var downloadedFiles []string
457-
var m sync.Mutex
458-
459459
err := bkt.Iter(ctx, src, func(name string) error {
460460
g.Go(func() error {
461461
dst := filepath.Join(dst, filepath.Base(name))
462462
if strings.HasSuffix(name, DirDelim) {
463-
if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil {
463+
if err := downloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil {
464464
return err
465465
}
466-
m.Lock()
467-
downloadedFiles = append(downloadedFiles, dst)
468-
m.Unlock()
469466
return nil
470467
}
471468
for _, ignoredPath := range opts.ignoredPaths {
@@ -474,34 +471,14 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
474471
return nil
475472
}
476473
}
477-
if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil {
478-
return err
479-
}
480-
481-
m.Lock()
482-
downloadedFiles = append(downloadedFiles, dst)
483-
m.Unlock()
484-
return nil
474+
return DownloadFile(ctx, logger, bkt, name, dst)
485475
})
486476
return nil
487477
})
488-
489-
if err == nil {
490-
err = g.Wait()
491-
}
492-
493478
if err != nil {
494-
downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory.
495-
// Best-effort cleanup if the download failed.
496-
for _, f := range downloadedFiles {
497-
if rerr := os.RemoveAll(f); rerr != nil {
498-
level.Warn(logger).Log("msg", "failed to remove file on partial dir download error", "file", f, "err", rerr)
499-
}
500-
}
501479
return err
502480
}
503-
504-
return nil
481+
return g.Wait()
505482
}
506483

507484
// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment objstore_bucket_operation_failures_total metric.

objstore_test.go

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/pkg/errors"
1919
"github.com/prometheus/client_golang/prometheus"
2020
promtest "github.com/prometheus/client_golang/prometheus/testutil"
21-
"go.uber.org/atomic"
2221
)
2322

2423
func TestMetricBucket_Close(t *testing.T) {
@@ -305,7 +304,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
305304
objstore_bucket_operations_total{bucket="",operation="upload"} 3
306305
`), `objstore_bucket_operations_total`))
307306

308-
testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10)))
307+
testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", tempDir, WithFetchConcurrency(10)))
309308
i, err := os.ReadDir(tempDir)
310309
testutil.Ok(t, err)
311310
testutil.Assert(t, len(i) == 3)
@@ -519,39 +518,6 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) {
519518
testutil.Assert(t, isReaderAt)
520519
}
521520

522-
func TestDownloadDir_CleanUp(t *testing.T) {
523-
b := unreliableBucket{
524-
Bucket: NewInMemBucket(),
525-
n: 3,
526-
current: atomic.NewInt32(0),
527-
}
528-
tempDir := t.TempDir()
529-
530-
testutil.Ok(t, b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
531-
testutil.Ok(t, b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
532-
testutil.Ok(t, b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))
533-
534-
// We exapect the third Get to fail
535-
testutil.NotOk(t, DownloadDir(context.Background(), log.NewNopLogger(), b, "dir/", "dir/", tempDir))
536-
_, err := os.Stat(tempDir)
537-
testutil.Assert(t, os.IsNotExist(err))
538-
}
539-
540-
// unreliableBucket implements Bucket and returns an error on every n-th Get.
541-
type unreliableBucket struct {
542-
Bucket
543-
544-
n int32
545-
current *atomic.Int32
546-
}
547-
548-
func (b unreliableBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
549-
if b.current.Inc()%b.n == 0 {
550-
return nil, errors.Errorf("some error message")
551-
}
552-
return b.Bucket.Get(ctx, name)
553-
}
554-
555521
// mockReader implements io.ReadCloser and allows to mock the functions.
556522
type mockReader struct {
557523
io.Reader

0 commit comments

Comments
 (0)