@@ -16,6 +16,7 @@ import (
16
16
"github.com/aws/aws-sdk-go-v2/credentials"
17
17
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
18
18
"github.com/aws/aws-sdk-go-v2/service/s3"
19
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
19
20
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
20
21
"github.com/containerd/containerd/content"
21
22
"github.com/containerd/containerd/labels"
@@ -44,6 +45,7 @@ const (
44
45
attrSecretAccessKey = "secret_access_key"
45
46
attrSessionToken = "session_token"
46
47
attrUsePathStyle = "use_path_style"
48
+ maxCopyObjectSize = 5 * 1024 * 1024 * 1024
47
49
)
48
50
49
51
type Config struct {
@@ -203,13 +205,13 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
203
205
}
204
206
205
207
key := e .s3Client .blobKey (dgstPair .Descriptor .Digest )
206
- exists , err := e .s3Client .exists (ctx , key )
208
+ exists , size , err := e .s3Client .exists (ctx , key )
207
209
if err != nil {
208
210
return nil , errors .Wrapf (err , "failed to check file presence in cache" )
209
211
}
210
212
if exists != nil {
211
213
if time .Since (* exists ) > e .config .TouchRefresh {
212
- err = e .s3Client .touch (ctx , key )
214
+ err = e .s3Client .touch (ctx , key , size )
213
215
if err != nil {
214
216
return nil , errors .Wrapf (err , "failed to touch file" )
215
217
}
@@ -449,7 +451,7 @@ func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io
449
451
return err
450
452
}
451
453
452
- func (s3Client * s3Client ) exists (ctx context.Context , key string ) (* time.Time , error ) {
454
+ func (s3Client * s3Client ) exists (ctx context.Context , key string ) (* time.Time , * int64 , error ) {
453
455
input := & s3.HeadObjectInput {
454
456
Bucket : & s3Client .bucket ,
455
457
Key : & key ,
@@ -458,26 +460,104 @@ func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, e
458
460
head , err := s3Client .HeadObject (ctx , input )
459
461
if err != nil {
460
462
if isNotFound (err ) {
461
- return nil , nil
463
+ return nil , nil , nil
462
464
}
463
- return nil , err
465
+ return nil , nil , err
466
+ }
467
+ return head .LastModified , head .ContentLength , nil
468
+ }
469
+
470
+ func buildCopySourceRange (start int64 , objectSize int64 ) string {
471
+ end := start + maxCopyObjectSize - 1
472
+ if end > objectSize {
473
+ end = objectSize - 1
464
474
}
465
- return head .LastModified , nil
475
+ startRange := strconv .FormatInt (start , 10 )
476
+ stopRange := strconv .FormatInt (end , 10 )
477
+ return "bytes=" + startRange + "-" + stopRange
466
478
}
467
479
468
- func (s3Client * s3Client ) touch (ctx context.Context , key string ) error {
480
+ func (s3Client * s3Client ) touch (ctx context.Context , key string , size * int64 ) ( err error ) {
469
481
copySource := fmt .Sprintf ("%s/%s" , s3Client .bucket , key )
470
- cp := & s3.CopyObjectInput {
471
- Bucket : & s3Client .bucket ,
472
- CopySource : & copySource ,
473
- Key : & key ,
474
- Metadata : map [string ]string {"updated-at" : time .Now ().String ()},
475
- MetadataDirective : "REPLACE" ,
482
+
483
+ // CopyObject does not support files > 5GB
484
+ if * size < maxCopyObjectSize {
485
+ cp := & s3.CopyObjectInput {
486
+ Bucket : & s3Client .bucket ,
487
+ CopySource : & copySource ,
488
+ Key : & key ,
489
+ Metadata : map [string ]string {"updated-at" : time .Now ().String ()},
490
+ MetadataDirective : "REPLACE" ,
491
+ }
492
+
493
+ _ , err := s3Client .CopyObject (ctx , cp )
494
+
495
+ return err
496
+ }
497
+ input := & s3.CreateMultipartUploadInput {
498
+ Bucket : & s3Client .bucket ,
499
+ Key : & key ,
476
500
}
477
501
478
- _ , err := s3Client .CopyObject (ctx , cp )
502
+ output , err := s3Client .CreateMultipartUpload (ctx , input )
503
+ if err != nil {
504
+ return err
505
+ }
479
506
480
- return err
507
+ defer func () {
508
+ abortIn := s3.AbortMultipartUploadInput {
509
+ Bucket : & s3Client .bucket ,
510
+ Key : & key ,
511
+ UploadId : output .UploadId ,
512
+ }
513
+ if err != nil {
514
+ s3Client .AbortMultipartUpload (ctx , & abortIn )
515
+ }
516
+ }()
517
+
518
+ var currentPartNumber int32 = 1
519
+ var currentPosition int64
520
+ var completedParts []types.CompletedPart
521
+
522
+ for currentPosition < * size {
523
+ copyRange := buildCopySourceRange (currentPosition , * size )
524
+ partInput := s3.UploadPartCopyInput {
525
+ Bucket : & s3Client .bucket ,
526
+ CopySource : & copySource ,
527
+ CopySourceRange : & copyRange ,
528
+ Key : & key ,
529
+ PartNumber : & currentPartNumber ,
530
+ UploadId : output .UploadId ,
531
+ }
532
+ uploadPartCopyResult , err := s3Client .UploadPartCopy (ctx , & partInput )
533
+ if err != nil {
534
+ return err
535
+ }
536
+ partNumber := new (int32 )
537
+ * partNumber = currentPartNumber
538
+ completedParts = append (completedParts , types.CompletedPart {
539
+ ETag : uploadPartCopyResult .CopyPartResult .ETag ,
540
+ PartNumber : partNumber ,
541
+ })
542
+
543
+ currentPartNumber ++
544
+ currentPosition += maxCopyObjectSize
545
+ }
546
+
547
+ completeMultipartUploadInput := & s3.CompleteMultipartUploadInput {
548
+ Bucket : & s3Client .bucket ,
549
+ Key : & key ,
550
+ UploadId : output .UploadId ,
551
+ MultipartUpload : & types.CompletedMultipartUpload {
552
+ Parts : completedParts ,
553
+ },
554
+ }
555
+
556
+ if _ , err := s3Client .CompleteMultipartUpload (ctx , completeMultipartUploadInput ); err != nil {
557
+ return err
558
+ }
559
+
560
+ return nil
481
561
}
482
562
483
563
func (s3Client * s3Client ) ReaderAt (ctx context.Context , desc ocispecs.Descriptor ) (content.ReaderAt , error ) {
0 commit comments