Skip to content

Commit 3c6e2bb

Browse files
adrienthebocsweichel
authored andcommitted
implement download, upload
1 parent 881151a commit 3c6e2bb

File tree

3 files changed

+147
-12
lines changed

3 files changed

+147
-12
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ require (
3535
github.com/aws/aws-sdk-go-v2/config v1.18.8 // indirect
3636
github.com/aws/aws-sdk-go-v2/credentials v1.13.8 // indirect
3737
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21 // indirect
38+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.47 // indirect
3839
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27 // indirect
3940
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21 // indirect
4041
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
@@ -55,6 +56,7 @@ require (
5556
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
5657
github.com/godbus/dbus/v5 v5.1.0 // indirect
5758
github.com/inconshreveable/mousetrap v1.0.1 // indirect
59+
github.com/jmespath/go-jmespath v0.4.0 // indirect
5860
github.com/moby/sys/mountinfo v0.6.2 // indirect
5961
github.com/opencontainers/selinux v1.10.2 // indirect
6062
github.com/pkg/errors v0.9.1 // indirect

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.13.8 h1:vTrwTvv5qAwjWIGhZDSBH/oQHuIQ
99
github.com/aws/aws-sdk-go-v2/credentials v1.13.8/go.mod h1:lVa4OHbvgjVot4gmh1uouF1ubgexSCN92P6CJQpT0t8=
1010
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21 h1:j9wi1kQ8b+e0FBVHxCqCGo4kxDU175hoDHcWAi0sauU=
1111
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.21/go.mod h1:ugwW57Z5Z48bpvUyZuaPy4Kv+vEfJWnIrky7RmkBvJg=
12+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.47 h1:E884ndKWVGt8IhtUuGhXbEsmaCvdAAkTTUDu7uAok1g=
13+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.47/go.mod h1:KybsEsmXLO0u75FyS3F0sY4OQ97syDe8z+ISq8oEczA=
1214
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27 h1:I3cakv2Uy1vNmmhRQmFptYDxOvBnwCdNwyw63N0RaRU=
1315
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27/go.mod h1:a1/UpzeyBBerajpnP5nGZa9mGzsBn5cOKxm6NWQsvoI=
1416
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21 h1:5NbbMrIzmUn/TXFqAle6mgrH5m9cOvMLRGL7pnG8tRE=
@@ -88,6 +90,7 @@ github.com/in-toto/in-toto-golang v0.3.3 h1:tkkEBU5i09UEeWKnrp6Rq4fXKAfpVXYMLRO5
8890
github.com/in-toto/in-toto-golang v0.3.3/go.mod h1:dbXecHGZSqRubmm5TXtvDSZT5JyaKD7ebVTiC2aMLWY=
8991
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
9092
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
93+
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
9194
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
9295
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
9396
github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI=

pkg/leeway/cache.go

Lines changed: 142 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/aws/aws-sdk-go-v2/aws"
1818
"github.com/aws/aws-sdk-go-v2/config"
19+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1920
"github.com/aws/aws-sdk-go-v2/service/s3"
2021
"github.com/aws/aws-sdk-go-v2/service/s3/types"
2122
"github.com/aws/aws-sdk-go-v2/service/sts"
@@ -250,10 +251,10 @@ type S3RemoteCache struct {
250251
func NewS3RemoteCache(bucketName string, cfg *aws.Config) (*S3RemoteCache, error) {
251252
if cfg == nil {
252253
v, err := config.LoadDefaultConfig(context.TODO())
253-
cfg = &v
254254
if err != nil {
255255
return nil, fmt.Errorf("cannot load s3 config: %s", err)
256256
}
257+
cfg = &v
257258
}
258259
s3Client := s3.NewFromConfig(*cfg)
259260

@@ -287,7 +288,7 @@ func (rs *S3RemoteCache) ExistingPackages(pkgs []*Package) (map[*Package]struct{
287288
continue
288289
}
289290

290-
packagesToKeys[p] = fmt.Sprintf("%s.tar.gz", version)
291+
packagesToKeys[p] = filepath.Base(fmt.Sprintf("%s.tar.gz", version))
291292
}
292293

293294
if len(packagesToKeys) == 0 {
@@ -296,12 +297,12 @@ func (rs *S3RemoteCache) ExistingPackages(pkgs []*Package) (map[*Package]struct{
296297
log.Debugf("Checking if %d packages exist in the remote cache using s3", len(packagesToKeys))
297298

298299
ch := make(chan *Package, len(packagesToKeys))
299-
defer close(ch)
300300

301301
existingPackages := make(map[*Package]struct{})
302302
wg := sync.WaitGroup{}
303303

304304
for pkg, key := range packagesToKeys {
305+
wg.Add(1)
305306
go func(pkg *Package, key string) {
306307
defer wg.Done()
307308

@@ -311,25 +312,108 @@ func (rs *S3RemoteCache) ExistingPackages(pkgs []*Package) (map[*Package]struct{
311312
ch <- pkg
312313
}
313314
}(pkg, key)
314-
315-
wg.Add(1)
316315
}
317316
wg.Wait()
317+
close(ch)
318+
319+
for p := range ch {
320+
existingPackages[p] = struct{}{}
321+
}
318322

319323
return existingPackages, nil
320324
}
321325

322326
// Download makes a best-effort attempt at downloading previously cached build artifacts for the given packages
323327
// in their current version. A cache miss (i.e. a build artifact not being available) does not constitute an
324328
// error. Get should try and download as many artifacts as possible.
325-
func (s3 *S3RemoteCache) Download(dst Cache, pkgs []*Package) error {
326-
panic("not implemented") // TODO: Implement
329+
func (rs *S3RemoteCache) Download(dst Cache, pkgs []*Package) error {
330+
fmt.Printf("☁️ downloading %d cached build artifacts from s3 remote cache\n", len(pkgs))
331+
var (
332+
files []string
333+
dest string
334+
)
335+
336+
for _, pkg := range pkgs {
337+
fn, exists := dst.Location(pkg)
338+
if exists {
339+
continue
340+
}
341+
342+
if dest == "" {
343+
dest = filepath.Dir(fn)
344+
} else if dest != filepath.Dir(fn) {
345+
return xerrors.Errorf("s3 cache only supports one target folder, not %s and %s", dest, filepath.Dir(fn))
346+
}
347+
348+
files = append(files, fmt.Sprintf("%s/%s", rs.BucketName, strings.TrimLeft(fn, "/")))
349+
}
350+
351+
wg := sync.WaitGroup{}
352+
353+
for _, file := range files {
354+
wg.Add(1)
355+
356+
go func(file string) {
357+
defer wg.Done()
358+
359+
key := filepath.Base(file)
360+
fields := log.Fields{
361+
"key": key,
362+
"bucket": rs.BucketName,
363+
"region": rs.s3Config.Region,
364+
}
365+
log.WithFields(fields).Debug("downloading object from s3")
366+
len, err := rs.getObject(context.TODO(), key, fmt.Sprintf("%s/%s", dest, key))
367+
if err != nil {
368+
log.WithFields(fields).Warnf("failed to download and store object %s from s3: %s", key, err)
369+
} else {
370+
log.WithFields(fields).Debugf("downloaded %d byte object from s3 to %s", len, file)
371+
}
372+
373+
}(file)
374+
}
375+
wg.Wait()
376+
377+
return nil
327378
}
328379

329380
// Upload makes a best effort to upload the build arfitacts to a remote cache. If uploading an artifact fails, that
330381
// does not constitute an error.
331-
func (s3 *S3RemoteCache) Upload(src Cache, pkgs []*Package) error {
332-
panic("not implemented") // TODO: Implement
382+
func (rs *S3RemoteCache) Upload(src Cache, pkgs []*Package) error {
383+
var files []string
384+
for _, pkg := range pkgs {
385+
file, exists := src.Location(pkg)
386+
if !exists {
387+
continue
388+
}
389+
files = append(files, file)
390+
}
391+
fmt.Fprintf(os.Stdout, "☁️ uploading %d build artifacts to s3 remote cache\n", len(files))
392+
393+
wg := sync.WaitGroup{}
394+
395+
for _, file := range files {
396+
wg.Add(1)
397+
go func(file string) {
398+
defer wg.Done()
399+
key := filepath.Base(file)
400+
fields := log.Fields{
401+
"key": key,
402+
"bucket": rs.BucketName,
403+
"region": rs.s3Config.Region,
404+
}
405+
log.WithFields(fields).Debug("uploading object to s3")
406+
res, err := rs.uploadObject(context.TODO(), filepath.Base(file), file)
407+
if err != nil {
408+
log.WithFields(fields).Warnf("Failed to upload object to s3: %s", err)
409+
} else {
410+
log.WithFields(fields).Debugf("completed upload to %s", res.Location)
411+
}
412+
}(file)
413+
}
414+
wg.Wait()
415+
416+
return nil
333417
}
334418

335419
func (rs *S3RemoteCache) hasBucket(ctx context.Context) (bool, error) {
@@ -356,11 +440,10 @@ func (rs *S3RemoteCache) hasBucket(ctx context.Context) (bool, error) {
356440
}
357441

358442
func (rs *S3RemoteCache) hasObject(ctx context.Context, key string) (bool, error) {
359-
cfg := *rs.s3Config
360443
fields := log.Fields{
361444
"key": key,
362445
"bucket": rs.BucketName,
363-
"region": cfg.Region,
446+
"region": rs.s3Config.Region,
364447
}
365448
log.WithFields(fields).Debugf("Checking s3 for cached package")
366449

@@ -377,7 +460,10 @@ func (rs *S3RemoteCache) hasObject(ctx context.Context, key string) (bool, error
377460
}
378461

379462
// We've received an error that's not a simple missing key error. Collect more information
380-
_, _ = rs.hasBucket(ctx)
463+
hasBucket, _ := rs.hasBucket(ctx)
464+
if !hasBucket {
465+
return false, err
466+
}
381467

382468
log.WithFields(fields).Warnf("S3 GetObject failed: %s", err)
383469
return false, err
@@ -386,3 +472,47 @@ func (rs *S3RemoteCache) hasObject(ctx context.Context, key string) (bool, error
386472
// XXX
387473
return true, nil
388474
}
475+
476+
func (rs *S3RemoteCache) getObject(ctx context.Context, key string, path string) (int64, error) {
477+
478+
var partMiBs int64 = 10
479+
downloader := manager.NewDownloader(rs.s3Client, func(d *manager.Downloader) {
480+
d.PartSize = partMiBs * 1024 * 1024
481+
})
482+
buffer := manager.NewWriteAtBuffer([]byte{})
483+
res, err := downloader.Download(context.TODO(), buffer, &s3.GetObjectInput{
484+
Bucket: aws.String(rs.BucketName),
485+
Key: aws.String(key),
486+
})
487+
if err != nil {
488+
return res, err
489+
}
490+
491+
err = os.WriteFile(path, buffer.Bytes(), 0644)
492+
if err != nil {
493+
return 0, xerrors.Errorf("failed to write s3 download to %s: %s", path, err)
494+
}
495+
return res, nil
496+
}
497+
498+
func (rs *S3RemoteCache) uploadObject(ctx context.Context, key string, path string) (*manager.UploadOutput, error) {
499+
500+
fN, err := os.Open(path)
501+
if err != nil {
502+
return nil, xerrors.Errorf("cannot open %s for S3 upload: %s", path, err)
503+
}
504+
505+
var partMiBs int64 = 10
506+
uploader := manager.NewUploader(rs.s3Client, func(u *manager.Uploader) {
507+
u.PartSize = partMiBs * 1024 * 1024
508+
})
509+
res, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{
510+
Bucket: aws.String(rs.BucketName),
511+
Key: aws.String(key),
512+
Body: fN,
513+
})
514+
if err != nil {
515+
return res, err
516+
}
517+
return res, nil
518+
}

0 commit comments

Comments
 (0)