Skip to content

Feat S3 Transfer Manager v2 DownloadDirectory #3163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions feature/s3/transfermanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ type S3APIClient interface {
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
HeadObject(context.Context, *s3.HeadObjectInput, ...func(*s3.Options)) (*s3.HeadObjectOutput, error)
ListObjectsV2(context.Context, *s3.ListObjectsV2Input, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
}
252 changes: 252 additions & 0 deletions feature/s3/transfermanager/api_op_DownloadDirectory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package transfermanager

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
)

// DownloadDirectoryInput represents a request to the DownloadDirectory() call
type DownloadDirectoryInput struct {
// Bucket where objects are downloaded from
Bucket string

// The destination directory to download
Destination string

// The S3 key prefix to use for listing objects. If not provided,
// all objects under a bucket will be retrieved
KeyPrefix string

// The s3 delimiter used to convert keyname to local filepath if it
// is different from local file separator
S3Delimiter string

// A callback func to allow users to fileter out unwanted objects
// according to bool returned from the function
Filter ObjectFilter

// A callback function to allow customers to update individual
// GetObjectInput that the S3 Transfer Manager generates
Callback GetRequestCallback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to be missing a few inputs from the SEP

  • recursive
  • failurePolicy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked offline, design for these seems to be somewhat in-flux so we can defer for now.

}

// ObjectFilter is the callback to allow users to filter out unwanted objects.
// It is invoked for each object listed.
type ObjectFilter interface {
// FilterObject take the Object struct and decides if the
// object should be downloaded
FilterObject(s3types.Object) bool
}

// GetRequestCallback is the callback mechanism to allow customers to update
// individual GetObjectInput that the S3 Transfer Manager generates
type GetRequestCallback interface {
// UpdateRequest preprocesses each GetObjectInput as customized
UpdateRequest(*GetObjectInput)
}

// DownloadDirectoryOutput represents a response from the DownloadDirectory() call
type DownloadDirectoryOutput struct {
// Total number of objects successfully downloaded
ObjectsDownloaded int
}

type objectEntry struct {
key string
path string
}

// DownloadDirectory traverses a s3 bucket and intelligently downloads all valid objects
// to local directory in parallel across multiple goroutines. You can configure the concurrency,
// valid object filtering and hierarchical file naming through the Options and input parameters.
//
// Additional functional options can be provided to configure the individual directory
// download. These options are copies of the original Options instance, the client of which DownloadDirectory is called from.
// Modifying the options will not impact the original Client and Options instance.
func (c *Client) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput, opts ...func(*Options)) (*DownloadDirectoryOutput, error) {
fileInfo, err := os.Stat(input.Destination)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("error when getting destination folder info: %v", err)
}
} else if !fileInfo.IsDir() {
return nil, fmt.Errorf("the destination path %s doesn't point to a valid directory", input.Destination)

}

i := directoryDownloader{c: c, in: input, options: c.options.Copy()}
for _, opt := range opts {
opt(&i.options)
}

return i.downloadDirectory(ctx)
}

type directoryDownloader struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Progress listener support?

c *Client
options Options
in *DownloadDirectoryInput

objectsDownloaded int

err error

mu sync.Mutex
wg sync.WaitGroup
}

func (d *directoryDownloader) downloadDirectory(ctx context.Context) (*DownloadDirectoryOutput, error) {
d.init()
ch := make(chan objectEntry)

for i := 0; i < d.options.DirectoryConcurrency; i++ {
d.wg.Add(1)
go d.downloadObject(ctx, ch)
}

isTruncated := true
continuationToken := ""
for isTruncated {
if d.getErr() != nil {
break
}
listOutput, err := d.options.S3.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.in.Bucket),
Prefix: nzstring(d.in.KeyPrefix),
ContinuationToken: nzstring(continuationToken),
})
if err != nil {
d.setErr(fmt.Errorf("error when listing objects %v", err))
break
}

for _, o := range listOutput.Contents {
key := aws.ToString(o.Key)
if strings.HasSuffix(key, "/") || strings.HasSuffix(key, d.in.S3Delimiter) {
continue // skip folder object
}
if d.in.Filter != nil && !d.in.Filter.FilterObject(o) {
continue
}
path, err := d.getLocalPath(key)
if err != nil {
d.setErr(fmt.Errorf("error when resolving local path for object %s, %v", key, err))
break
}
ch <- objectEntry{key, path}
}

continuationToken = aws.ToString(listOutput.NextContinuationToken)
isTruncated = aws.ToBool(listOutput.IsTruncated)
}

close(ch)
d.wg.Wait()

if d.err != nil {
return nil, d.err
}

return &DownloadDirectoryOutput{
ObjectsDownloaded: d.objectsDownloaded,
}, nil
}

func (d *directoryDownloader) init() {
if d.in.S3Delimiter == "" {
d.in.S3Delimiter = "/"
}
}

func (d *directoryDownloader) getLocalPath(key string) (string, error) {
keyprefix := d.in.KeyPrefix
if keyprefix != "" && !strings.HasSuffix(keyprefix, d.in.S3Delimiter) {
keyprefix = keyprefix + d.in.S3Delimiter
}
path := filepath.Join(d.in.Destination, strings.ReplaceAll(strings.TrimPrefix(key, keyprefix), d.in.S3Delimiter, string(os.PathSeparator)))
relPath, err := filepath.Rel(d.in.Destination, path)
if err != nil {
return "", err
}
if relPath == "." || strings.Contains(relPath, "..") {
return "", fmt.Errorf("resolved local path %s is outside of destination %s", path, d.in.Destination)
}

return path, nil
}

func (d *directoryDownloader) downloadObject(ctx context.Context, ch chan objectEntry) {
defer d.wg.Done()
for {
data, ok := <-ch
if !ok {
break
}
if d.getErr() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should likely also be checking for context cancellation here (after the channel read).

break
}

input := &GetObjectInput{
Bucket: d.in.Bucket,
Key: data.key,
}
if d.in.Callback != nil {
d.in.Callback.UpdateRequest(input)
}
out, err := d.c.GetObject(ctx, input)
if err != nil {
d.setErr(fmt.Errorf("error when downloading object %s: %v", data.key, err))
break
}

err = os.MkdirAll(filepath.Dir(data.path), os.ModePerm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.ModePerm is 0777, i.e. anyone on the system can read/write/execute/cd into it. I doubt that's what we want to set here. I don't see a set of default permissions mentioned in the SEP, I would seek clarification from the authors.

if err != nil {
d.setErr(fmt.Errorf("error when creating directory for file %s: %v", data.path, err))
break
}
file, err := os.Create(data.path)
if err != nil {
d.setErr(fmt.Errorf("error when creating file %s: %v", data.path, err))
break
}
_, err = io.Copy(file, out.Body)
if err != nil {
d.setErr(fmt.Errorf("error when writing to local file %s: %v", data.path, err))
os.Remove(data.path)
break
}
d.incrObjectsDownloaded(1)
}
}

func (d *directoryDownloader) incrObjectsDownloaded(n int) {
d.mu.Lock()
defer d.mu.Unlock()

d.objectsDownloaded += n
}

func (d *directoryDownloader) setErr(err error) {
d.mu.Lock()
defer d.mu.Unlock()

d.err = err
}

func (d *directoryDownloader) getErr() error {
d.mu.Lock()
defer d.mu.Unlock()

return d.err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//go:build integration
// +build integration

package transfermanager

import (
"testing"
)

func TestInteg_DownloadDirectory(t *testing.T) {
cases := map[string]downloadDirectoryTestData{
"multi objects with prefix": {
ObjectsSize: map[string]int64{
"oii/bar": 2 * 1024 * 1024,
"oiibaz/zoo": 10 * 1024 * 1024,
"oii/baz/zoo": 10 * 1024 * 1024,
"oi": 20 * 1024 * 1024,
},
KeyPrefix: "oii",
ExpectObjectsDownloaded: 3,
ExpectFiles: []string{"bar", "oiibaz/zoo", "baz/zoo"},
},
"multi file with prefix and custom delimiter": {
ObjectsSize: map[string]int64{
"yee#bar": 2 * 1024 * 1024,
"yee#baz#": 0,
"yee#baz#zoo": 10 * 1024 * 1024,
"yee#oii@zoo": 10 * 1024 * 1024,
"yee#yee#..#bla": 2 * 1024 * 1024,
"ye": 20 * 1024 * 1024,
},
KeyPrefix: "yee#",
Delimiter: "#",
ExpectObjectsDownloaded: 4,
ExpectFiles: []string{"bar", "baz/zoo", "oii@zoo", "bla"},
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
testDownloadDirectory(t, setupMetadata.Buckets.Source.Name, c)
})
}
}
30 changes: 15 additions & 15 deletions feature/s3/transfermanager/api_op_UploadDirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,22 @@ func (u *directoryUploader) uploadFile(ctx context.Context, ch chan fileEntry) {
f, err := os.Open(data.path)
if err != nil {
u.setErr(fmt.Errorf("error when opening file %s: %v", data.path, err))
} else {
input := &PutObjectInput{
Bucket: u.in.Bucket,
Key: data.key,
Body: f,
}
if u.in.Callback != nil {
u.in.Callback.UpdateRequest(input)
}
_, err := u.c.PutObject(ctx, input)
if err != nil {
u.setErr(fmt.Errorf("error when uploading file %s: %v", data.path, err))
} else {
u.incrFilesUploaded(1)
}
break
}
input := &PutObjectInput{
Bucket: u.in.Bucket,
Key: data.key,
Body: f,
}
if u.in.Callback != nil {
u.in.Callback.UpdateRequest(input)
}
_, err = u.c.PutObject(ctx, input)
if err != nil {
u.setErr(fmt.Errorf("error when uploading file %s: %v", data.path, err))
break
}
u.incrFilesUploaded(1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestInteg_UploadDirectory(t *testing.T) {
ExpectFilesUploaded: 3,
ExpectKeys: []string{"bla/foo", "bla/to/bar", "bla/to/the/baz"},
},
"multi file recursive with prefix and custome delimiter": {
"multi file recursive with prefix and custom delimiter": {
FilesSize: map[string]int64{
"foo": 2 * 1024 * 1024,
"to/bar": 10 * 1024 * 1024,
Expand Down
Loading
Loading