Skip to content

Commit 5677f67

Browse files
authored
Merge pull request moby#5266 from bpaquet/fix_4885
S3 Cache: Use multipart upload instead of CopyObject for touching file > 5GB
2 parents 53d77f7 + 6c40a3e commit 5677f67

File tree

2 files changed

+96
-15
lines changed

2 files changed

+96
-15
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ Other options are:
576576
* `name=<manifest>`: specify name of the manifest to use (default `buildkit`)
577577
* Multiple manifest names can be specified at the same time, separated by `;`. The standard use case is to use the git sha1 as name, and the branch name as duplicate, and load both with 2 `import-cache` commands.
578578
* `ignore-error=<false|true>`: specify if error is ignored in case cache export fails (default: `false`)
579+
* `touch_refresh=24h`: Instead of being uploaded again when not changed, blobs files will be "touched" on s3 every `touch_refresh`, default is 24h. Due to this, an expiration policy can be set on the S3 bucket to cleanup useless files automatically. Manifests files are systematically rewritten, there is no need to touch them.
579580

580581
`--import-cache` options:
581582
* `type=s3`

cache/remotecache/s3/s3.go

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/aws/aws-sdk-go-v2/credentials"
1717
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
1818
"github.com/aws/aws-sdk-go-v2/service/s3"
19+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
1920
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
2021
"github.com/containerd/containerd/content"
2122
"github.com/containerd/containerd/labels"
@@ -44,6 +45,7 @@ const (
4445
attrSecretAccessKey = "secret_access_key"
4546
attrSessionToken = "session_token"
4647
attrUsePathStyle = "use_path_style"
48+
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
4749
)
4850

4951
type Config struct {
@@ -203,13 +205,13 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
203205
}
204206

205207
key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
206-
exists, err := e.s3Client.exists(ctx, key)
208+
exists, size, err := e.s3Client.exists(ctx, key)
207209
if err != nil {
208210
return nil, errors.Wrapf(err, "failed to check file presence in cache")
209211
}
210212
if exists != nil {
211213
if time.Since(*exists) > e.config.TouchRefresh {
212-
err = e.s3Client.touch(ctx, key)
214+
err = e.s3Client.touch(ctx, key, size)
213215
if err != nil {
214216
return nil, errors.Wrapf(err, "failed to touch file")
215217
}
@@ -449,7 +451,7 @@ func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io
449451
return err
450452
}
451453

452-
func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) {
454+
func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, *int64, error) {
453455
input := &s3.HeadObjectInput{
454456
Bucket: &s3Client.bucket,
455457
Key: &key,
@@ -458,26 +460,104 @@ func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, e
458460
head, err := s3Client.HeadObject(ctx, input)
459461
if err != nil {
460462
if isNotFound(err) {
461-
return nil, nil
463+
return nil, nil, nil
462464
}
463-
return nil, err
465+
return nil, nil, err
466+
}
467+
return head.LastModified, head.ContentLength, nil
468+
}
469+
470+
func buildCopySourceRange(start int64, objectSize int64) string {
471+
end := start + maxCopyObjectSize - 1
472+
if end > objectSize {
473+
end = objectSize - 1
464474
}
465-
return head.LastModified, nil
475+
startRange := strconv.FormatInt(start, 10)
476+
stopRange := strconv.FormatInt(end, 10)
477+
return "bytes=" + startRange + "-" + stopRange
466478
}
467479

468-
func (s3Client *s3Client) touch(ctx context.Context, key string) error {
480+
func (s3Client *s3Client) touch(ctx context.Context, key string, size *int64) (err error) {
469481
copySource := fmt.Sprintf("%s/%s", s3Client.bucket, key)
470-
cp := &s3.CopyObjectInput{
471-
Bucket: &s3Client.bucket,
472-
CopySource: &copySource,
473-
Key: &key,
474-
Metadata: map[string]string{"updated-at": time.Now().String()},
475-
MetadataDirective: "REPLACE",
482+
483+
// CopyObject does not support files > 5GB
484+
if *size < maxCopyObjectSize {
485+
cp := &s3.CopyObjectInput{
486+
Bucket: &s3Client.bucket,
487+
CopySource: &copySource,
488+
Key: &key,
489+
Metadata: map[string]string{"updated-at": time.Now().String()},
490+
MetadataDirective: "REPLACE",
491+
}
492+
493+
_, err := s3Client.CopyObject(ctx, cp)
494+
495+
return err
496+
}
497+
input := &s3.CreateMultipartUploadInput{
498+
Bucket: &s3Client.bucket,
499+
Key: &key,
476500
}
477501

478-
_, err := s3Client.CopyObject(ctx, cp)
502+
output, err := s3Client.CreateMultipartUpload(ctx, input)
503+
if err != nil {
504+
return err
505+
}
479506

480-
return err
507+
defer func() {
508+
abortIn := s3.AbortMultipartUploadInput{
509+
Bucket: &s3Client.bucket,
510+
Key: &key,
511+
UploadId: output.UploadId,
512+
}
513+
if err != nil {
514+
s3Client.AbortMultipartUpload(ctx, &abortIn)
515+
}
516+
}()
517+
518+
var currentPartNumber int32 = 1
519+
var currentPosition int64
520+
var completedParts []types.CompletedPart
521+
522+
for currentPosition < *size {
523+
copyRange := buildCopySourceRange(currentPosition, *size)
524+
partInput := s3.UploadPartCopyInput{
525+
Bucket: &s3Client.bucket,
526+
CopySource: &copySource,
527+
CopySourceRange: &copyRange,
528+
Key: &key,
529+
PartNumber: &currentPartNumber,
530+
UploadId: output.UploadId,
531+
}
532+
uploadPartCopyResult, err := s3Client.UploadPartCopy(ctx, &partInput)
533+
if err != nil {
534+
return err
535+
}
536+
partNumber := new(int32)
537+
*partNumber = currentPartNumber
538+
completedParts = append(completedParts, types.CompletedPart{
539+
ETag: uploadPartCopyResult.CopyPartResult.ETag,
540+
PartNumber: partNumber,
541+
})
542+
543+
currentPartNumber++
544+
currentPosition += maxCopyObjectSize
545+
}
546+
547+
completeMultipartUploadInput := &s3.CompleteMultipartUploadInput{
548+
Bucket: &s3Client.bucket,
549+
Key: &key,
550+
UploadId: output.UploadId,
551+
MultipartUpload: &types.CompletedMultipartUpload{
552+
Parts: completedParts,
553+
},
554+
}
555+
556+
if _, err := s3Client.CompleteMultipartUpload(ctx, completeMultipartUploadInput); err != nil {
557+
return err
558+
}
559+
560+
return nil
481561
}
482562

483563
func (s3Client *s3Client) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {

0 commit comments

Comments
 (0)