Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#89](https://github.com/thanos-io/objstore/pull/89) GCS: Upgrade cloud.google.com/go/storage version to `v1.35.1`.
- [#123](https://github.com/thanos-io/objstore/pull/123) *: Upgrade minio-go version to `v7.0.72`.
- [#132](https://github.com/thanos-io/objstore/pull/132) s3: Upgrade aws-sdk-go-v2/config version to `v1.27.30`
- [#182](https://github.com/thanos-io/objstore/pull/182) download: dont cleanup directory after download failed

### Removed
39 changes: 8 additions & 31 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"path/filepath"
"slices"
"strings"
"sync"
"time"

"github.com/efficientgo/core/logerrcapture"
Expand Down Expand Up @@ -442,7 +441,11 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src,
}

// DownloadDir downloads all object found in the directory into the local directory.
func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error {
func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string, options ...DownloadOption) error {
return downloadDir(ctx, logger, bkt, src, src, dst, options...)
}

func downloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering first of all whether downloadDir shouldn't return immediately if ctx.Err() is non-nil. The reason is that downloadDir can be called recursively with a context from errgroup.WithContext, that gets canceled when one or more of the goroutines fail.

if err := os.MkdirAll(dst, 0750); err != nil {
return errors.Wrap(err, "create dir")
}
Expand All @@ -453,19 +456,13 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(opts.concurrency)

var downloadedFiles []string
var m sync.Mutex

err := bkt.Iter(ctx, src, func(name string) error {
g.Go(func() error {
dst := filepath.Join(dst, filepath.Base(name))
if strings.HasSuffix(name, DirDelim) {
if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil {
if err := downloadDir(ctx, logger, bkt, originalSrc, name, dst, options...); err != nil {
return err
}
m.Lock()
downloadedFiles = append(downloadedFiles, dst)
m.Unlock()
return nil
}
for _, ignoredPath := range opts.ignoredPaths {
Expand All @@ -474,34 +471,14 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
return nil
}
}
if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil {
return err
}

m.Lock()
downloadedFiles = append(downloadedFiles, dst)
m.Unlock()
return nil
return DownloadFile(ctx, logger, bkt, name, dst)
Copy link
Contributor

@aknuds1 aknuds1 Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does DownloadFile respect cancellation via ctx? It ought to, so download can halt when one or more gouroutines fail.

})
return nil
})

if err == nil {
err = g.Wait()
}

if err != nil {
downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory.
// Best-effort cleanup if the download failed.
for _, f := range downloadedFiles {
if rerr := os.RemoveAll(f); rerr != nil {
level.Warn(logger).Log("msg", "failed to remove file on partial dir download error", "file", f, "err", rerr)
}
}
return err
}

return nil
return g.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not calling g.Wait if err is non-nil? It seems wrong to me not to wait on the child gouroutines. I am also wondering whether a cancelable ctx should be created, that gets passed to errgroup.WithContext(ctx), and gets canceled when bkt.Iter returns an error. WDYT?

}

// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment objstore_bucket_operation_failures_total metric.
Expand Down
36 changes: 1 addition & 35 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"go.uber.org/atomic"
)

func TestMetricBucket_Close(t *testing.T) {
Expand Down Expand Up @@ -305,7 +304,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
objstore_bucket_operations_total{bucket="",operation="upload"} 3
`), `objstore_bucket_operations_total`))

testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", "dir/", tempDir, WithFetchConcurrency(10)))
testutil.Ok(t, DownloadDir(context.Background(), log.NewNopLogger(), m, "dir/", tempDir, WithFetchConcurrency(10)))
i, err := os.ReadDir(tempDir)
testutil.Ok(t, err)
testutil.Assert(t, len(i) == 3)
Expand Down Expand Up @@ -519,39 +518,6 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) {
testutil.Assert(t, isReaderAt)
}

func TestDownloadDir_CleanUp(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you keep this test, and assert that the downloaded files are kept?

b := unreliableBucket{
Bucket: NewInMemBucket(),
n: 3,
current: atomic.NewInt32(0),
}
tempDir := t.TempDir()

testutil.Ok(t, b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
testutil.Ok(t, b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
testutil.Ok(t, b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))

// We exapect the third Get to fail
testutil.NotOk(t, DownloadDir(context.Background(), log.NewNopLogger(), b, "dir/", "dir/", tempDir))
_, err := os.Stat(tempDir)
testutil.Assert(t, os.IsNotExist(err))
}

// unreliableBucket implements Bucket and returns an error on every n-th Get.
type unreliableBucket struct {
Bucket

n int32
current *atomic.Int32
}

func (b unreliableBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if b.current.Inc()%b.n == 0 {
return nil, errors.Errorf("some error message")
}
return b.Bucket.Get(ctx, name)
}

// mockReader implements io.ReadCloser and allows to mock the functions.
type mockReader struct {
io.Reader
Expand Down
Loading