diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fee1ccd41..e60b16f331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,5 +73,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#89](https://github.com/thanos-io/objstore/pull/89) GCS: Upgrade cloud.google.com/go/storage version to `v1.35.1`. - [#123](https://github.com/thanos-io/objstore/pull/123) *: Upgrade minio-go version to `v7.0.72`. - [#132](https://github.com/thanos-io/objstore/pull/132) s3: Upgrade aws-sdk-go-v2/config version to `v1.27.30` +- [#182](https://github.com/thanos-io/objstore/pull/182) download: dont cleanup directory after download failed ### Removed diff --git a/objstore.go b/objstore.go index bdbb52a390..58bc51867f 100644 --- a/objstore.go +++ b/objstore.go @@ -14,7 +14,6 @@ import ( "path/filepath" "slices" "strings" - "sync" "time" "github.com/efficientgo/core/logerrcapture" @@ -442,7 +441,11 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } // DownloadDir downloads all object found in the directory into the local directory. -func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error { +func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string, options ...DownloadOption) error { + return downloadDir(ctx, logger, bkt, src, src, dst, options...) +} + +func downloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error { if err := os.MkdirAll(dst, 0750); err != nil { return errors.Wrap(err, "create dir") } @@ -453,19 +456,13 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi g, ctx := errgroup.WithContext(ctx) g.SetLimit(opts.concurrency) - var downloadedFiles []string - var m sync.Mutex - err := bkt.Iter(ctx, src, func(name string) error { g.Go(func() error { dst := filepath.Join(dst, filepath.Base(name)) if strings.HasSuffix(name, DirDelim) { - if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil { + if err := downloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil { return err } - m.Lock() - downloadedFiles = append(downloadedFiles, dst) - m.Unlock() return nil } for _, ignoredPath := range opts.ignoredPaths { @@ -474,34 +471,14 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi return nil } } - if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { - return err - } - - m.Lock() - downloadedFiles = append(downloadedFiles, dst) - m.Unlock() - return nil + return DownloadFile(ctx, logger, bkt, name, dst) }) return nil }) - - if err == nil { - err = g.Wait() - } - if err != nil { - downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory. - // Best-effort cleanup if the download failed. - for _, f := range downloadedFiles { - if rerr := os.RemoveAll(f); rerr != nil { - level.Warn(logger).Log("msg", "failed to remove file on partial dir download error", "file", f, "err", rerr) - } - } return err } - - return nil + return g.Wait() } // IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment objstore_bucket_operation_failures_total metric. diff --git a/objstore_test.go b/objstore_test.go index 4cde0257e7..1f31b42eea 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -18,7 +18,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" - "go.uber.org/atomic" ) func TestMetricBucket_Close(t *testing.T) { @@ -305,7 +304,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { objstore_bucket_operations_total{bucket="",operation="upload"} 3 `), `objstore_bucket_operations_total`)) - testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10))) + testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", tempDir, WithFetchConcurrency(10))) i, err := os.ReadDir(tempDir) testutil.Ok(t, err) testutil.Assert(t, len(i) == 3) @@ -519,39 +518,6 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) { testutil.Assert(t, isReaderAt) } -func TestDownloadDir_CleanUp(t *testing.T) { - b := unreliableBucket{ - Bucket: NewInMemBucket(), - n: 3, - current: atomic.NewInt32(0), - } - tempDir := t.TempDir() - - testutil.Ok(t, b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) - testutil.Ok(t, b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) - testutil.Ok(t, b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) - - // We exapect the third Get to fail - testutil.NotOk(t, DownloadDir(context.Background(), log.NewNopLogger(), b, "dir/", "dir/", tempDir)) - _, err := os.Stat(tempDir) - testutil.Assert(t, os.IsNotExist(err)) -} - -// unreliableBucket implements Bucket and returns an error on every n-th Get. -type unreliableBucket struct { - Bucket - - n int32 - current *atomic.Int32 -} - -func (b unreliableBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - if b.current.Inc()%b.n == 0 { - return nil, errors.Errorf("some error message") - } - return b.Bucket.Get(ctx, name) -} - // mockReader implements io.ReadCloser and allows to mock the functions. type mockReader struct { io.Reader