@@ -24,6 +24,10 @@ import (
24
24
"golang.org/x/xerrors"
25
25
)
26
26
27
+ // The part size of S3 bucket uploads/downloads for multipart file operations.
28
+ // See also: https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
29
+ const S3_PART_SIZE = 5 * 1024 * 1024
30
+
27
31
// Cache provides filesystem locations for package build artifacts.
28
32
type Cache interface {
29
33
// Location returns the absolute filesystem path for a package build artifact
@@ -241,7 +245,7 @@ func gsutilTransfer(target string, files []string) error {
241
245
return nil
242
246
}
243
247
244
- // S3RemoteCache uses the AWS Go SDK to implement a remote cache
248
+ // S3RemoteCache uses the AWS Go SDK to implement a remote cache.
245
249
type S3RemoteCache struct {
246
250
BucketName string
247
251
s3Config * aws.Config
@@ -259,6 +263,8 @@ func NewS3RemoteCache(bucketName string, cfg *aws.Config) (*S3RemoteCache, error
259
263
s3Client := s3 .NewFromConfig (* cfg )
260
264
261
265
log .DebugFn (func () []interface {} {
266
+ // When the log level is set to debug fetch the AWS caller identity in case there's uncertainty about
267
+ // which AWS profile is active and interacting with the caching bucket.
262
268
stsClient := sts .NewFromConfig (* cfg )
263
269
identity , err := stsClient .GetCallerIdentity (context .TODO (), & sts.GetCallerIdentityInput {})
264
270
if err != nil {
@@ -301,13 +307,16 @@ func (rs *S3RemoteCache) ExistingPackages(pkgs []*Package) (map[*Package]struct{
301
307
existingPackages := make (map [* Package ]struct {})
302
308
wg := sync.WaitGroup {}
303
309
310
+ ctx := context .TODO ()
304
311
for pkg , key := range packagesToKeys {
305
312
wg .Add (1 )
306
313
go func (pkg * Package , key string ) {
307
314
defer wg .Done ()
308
315
309
- stat , _ := rs .hasObject (context .TODO (), key )
310
- // TODO error handling
316
+ stat , err := rs .hasObject (ctx , key )
317
+ if err != nil {
318
+ log .WithField ("bucket" , rs .BucketName ).WithField ("key" , key ).Debugf ("Failed to check for remote cached object: %s" , err )
319
+ }
311
320
if stat {
312
321
ch <- pkg
313
322
}
@@ -319,6 +328,7 @@ func (rs *S3RemoteCache) ExistingPackages(pkgs []*Package) (map[*Package]struct{
319
328
for p := range ch {
320
329
existingPackages [p ] = struct {}{}
321
330
}
331
+ log .WithField ("bucket" , rs .BucketName ).Debugf ("%d/%d packages found in remote cache" , len (existingPackages ), len (packagesToKeys ))
322
332
323
333
return existingPackages , nil
324
334
}
@@ -350,6 +360,7 @@ func (rs *S3RemoteCache) Download(dst Cache, pkgs []*Package) error {
350
360
351
361
wg := sync.WaitGroup {}
352
362
363
+ ctx := context .TODO ()
353
364
for _ , file := range files {
354
365
wg .Add (1 )
355
366
@@ -363,7 +374,7 @@ func (rs *S3RemoteCache) Download(dst Cache, pkgs []*Package) error {
363
374
"region" : rs .s3Config .Region ,
364
375
}
365
376
log .WithFields (fields ).Debug ("downloading object from s3" )
366
- len , err := rs .getObject (context . TODO () , key , fmt .Sprintf ("%s/%s" , dest , key ))
377
+ len , err := rs .getObject (ctx , key , fmt .Sprintf ("%s/%s" , dest , key ))
367
378
if err != nil {
368
379
log .WithFields (fields ).Warnf ("failed to download and store object %s from s3: %s" , key , err )
369
380
} else {
@@ -392,6 +403,7 @@ func (rs *S3RemoteCache) Upload(src Cache, pkgs []*Package) error {
392
403
393
404
wg := sync.WaitGroup {}
394
405
406
+ ctx := context .TODO ()
395
407
for _ , file := range files {
396
408
wg .Add (1 )
397
409
go func (file string ) {
@@ -403,7 +415,7 @@ func (rs *S3RemoteCache) Upload(src Cache, pkgs []*Package) error {
403
415
"region" : rs .s3Config .Region ,
404
416
}
405
417
log .WithFields (fields ).Debug ("uploading object to s3" )
406
- res , err := rs .uploadObject (context . TODO () , filepath .Base (file ), file )
418
+ res , err := rs .uploadObject (ctx , filepath .Base (file ), file )
407
419
if err != nil {
408
420
log .WithFields (fields ).Warnf ("Failed to upload object to s3: %s" , err )
409
421
} else {
@@ -447,6 +459,10 @@ func (rs *S3RemoteCache) hasObject(ctx context.Context, key string) (bool, error
447
459
}
448
460
log .WithFields (fields ).Debugf ("Checking s3 for cached package" )
449
461
462
+ // The AWS HeadObject API call would be slightly more suitable here but the Go API
463
+ // buries missing object errors behind some internal types. Using GetObject with a zero
464
+ // byte range provides a more clear error message that can indicate the absence of an
465
+ // object at the expense of performing a slightly more expensive API call.
450
466
_ , err := rs .s3Client .GetObject (ctx , & s3.GetObjectInput {
451
467
Bucket : aws .String (rs .BucketName ),
452
468
Key : aws .String (key ),
@@ -459,7 +475,8 @@ func (rs *S3RemoteCache) hasObject(ctx context.Context, key string) (bool, error
459
475
return false , nil
460
476
}
461
477
462
- // We've received an error that's not a simple missing key error. Collect more information
478
+ // We've received an error that's not a simple missing key error. Attempt to look up
479
+ // the cache bucket in case we're trying to use a bucket that doesn't exist.
463
480
hasBucket , _ := rs .hasBucket (ctx )
464
481
if ! hasBucket {
465
482
return false , err
@@ -469,15 +486,13 @@ func (rs *S3RemoteCache) hasObject(ctx context.Context, key string) (bool, error
469
486
return false , err
470
487
}
471
488
472
- // XXX
473
489
return true , nil
474
490
}
475
491
476
492
func (rs * S3RemoteCache ) getObject (ctx context.Context , key string , path string ) (int64 , error ) {
477
493
478
- var partMiBs int64 = 10
479
494
downloader := manager .NewDownloader (rs .s3Client , func (d * manager.Downloader ) {
480
- d .PartSize = partMiBs * 1024 * 1024
495
+ d .PartSize = S3_PART_SIZE
481
496
})
482
497
buffer := manager .NewWriteAtBuffer ([]byte {})
483
498
res , err := downloader .Download (context .TODO (), buffer , & s3.GetObjectInput {
@@ -502,9 +517,8 @@ func (rs *S3RemoteCache) uploadObject(ctx context.Context, key string, path stri
502
517
return nil , xerrors .Errorf ("cannot open %s for S3 upload: %s" , path , err )
503
518
}
504
519
505
- var partMiBs int64 = 10
506
520
uploader := manager .NewUploader (rs .s3Client , func (u * manager.Uploader ) {
507
- u .PartSize = partMiBs * 1024 * 1024
521
+ u .PartSize = S3_PART_SIZE
508
522
})
509
523
res , err := uploader .Upload (context .TODO (), & s3.PutObjectInput {
510
524
Bucket : aws .String (rs .BucketName ),
0 commit comments