Skip to content

Commit 0f09abd

Browse files
authored
feat(casbackends): support for s3 compatible endpoints (minio, cloudflare R2, ...) (#1055)
Signed-off-by: Miguel Martinez Trivino <[email protected]>
1 parent eeddc26 commit 0f09abd

File tree

5 files changed

+272
-85
lines changed

5 files changed

+272
-85
lines changed

app/cli/cmd/casbackend_add_s3.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
package cmd
1717

1818
import (
19+
"fmt"
20+
1921
"github.com/chainloop-dev/chainloop/app/cli/internal/action"
2022
"github.com/chainloop-dev/chainloop/pkg/blobmanager/s3"
2123
"github.com/go-kratos/kratos/v2/log"
2224
"github.com/spf13/cobra"
2325
)
2426

2527
func newCASBackendAddAWSS3Cmd() *cobra.Command {
26-
var bucketName, accessKeyID, secretAccessKey, region string
28+
var bucketName, accessKeyID, secretAccessKey, region, endpoint string
2729
cmd := &cobra.Command{
2830
Use: "aws-s3",
2931
Short: "Register a AWS S3 storage bucket",
@@ -46,9 +48,15 @@ func newCASBackendAddAWSS3Cmd() *cobra.Command {
4648
}
4749
}
4850

51+
location := bucketName
52+
// If there is a custom endpoint we want to store it as part of the fqdn location
53+
if endpoint != "" {
54+
location = fmt.Sprintf("%s/%s", endpoint, bucketName)
55+
}
56+
4957
opts := &action.NewCASBackendAddOpts{
5058
Name: name,
51-
Location: bucketName,
59+
Location: location,
5260
Provider: s3.ProviderID,
5361
Description: description,
5462
Credentials: map[string]any{
@@ -83,8 +91,9 @@ func newCASBackendAddAWSS3Cmd() *cobra.Command {
8391
cobra.CheckErr(err)
8492

8593
cmd.Flags().StringVar(&region, "region", "", "AWS region for the bucket")
86-
err = cmd.MarkFlagRequired("region")
8794
cobra.CheckErr(err)
8895

96+
cmd.Flags().StringVar(&endpoint, "endpoint", "", "Custom Endpoint URL for other S3 compatible backends i.e MinIO")
97+
8998
return cmd
9099
}

pkg/blobmanager/s3/backend.go

Lines changed: 96 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//
2-
// Copyright 2023 The Chainloop Authors.
2+
// Copyright 2024 The Chainloop Authors.
33
//
44
// Licensed under the Apache License, Version 2.0 (the "License");
55
// you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"fmt"
2424
"io"
25+
"net/url"
2526
"strings"
2627

2728
"github.com/aws/aws-sdk-go/aws"
@@ -40,28 +41,16 @@ const (
4041
)
4142

4243
type Backend struct {
43-
client *s3.S3
44-
bucket string
44+
client *s3.S3
45+
bucket string
46+
customEndpoint string
4547
}
4648

4749
var _ backend.UploaderDownloader = (*Backend)(nil)
4850

49-
type ConnOpt func(*aws.Config)
51+
const defaultRegion = "us-east-1"
5052

51-
// Optional endpoint configuration
52-
func WithEndpoint(endpoint string) ConnOpt {
53-
return func(cfg *aws.Config) {
54-
cfg.Endpoint = aws.String(endpoint)
55-
}
56-
}
57-
58-
func WithForcedS3PathStyle(force bool) ConnOpt {
59-
return func(cfg *aws.Config) {
60-
cfg.S3ForcePathStyle = aws.Bool(force)
61-
}
62-
}
63-
64-
func NewBackend(creds *Credentials, connOpts ...ConnOpt) (*Backend, error) {
53+
func NewBackend(creds *Credentials) (*Backend, error) {
6554
if creds == nil {
6655
return nil, errors.New("credentials cannot be nil")
6756
}
@@ -70,11 +59,27 @@ func NewBackend(creds *Credentials, connOpts ...ConnOpt) (*Backend, error) {
7059
return nil, fmt.Errorf("invalid credentials: %w", err)
7160
}
7261

62+
// Set a default region if not provided
63+
var region = defaultRegion
64+
if creds.Region != "" {
65+
region = creds.Region
66+
}
67+
7368
c := credentials.NewStaticCredentials(creds.AccessKeyID, creds.SecretAccessKey, "")
7469
// Configure AWS session
75-
cfg := &aws.Config{Credentials: c, Region: aws.String(creds.Region)}
76-
for _, opt := range connOpts {
77-
opt(cfg)
70+
cfg := &aws.Config{Credentials: c, Region: aws.String(region)}
71+
72+
// Bucket might contain the not only the bucket name but also the endpoint
73+
endpoint, bucket, err := extractLocationAndBucket(creds)
74+
if err != nil {
75+
return nil, fmt.Errorf("failed to parse bucket name: %w", err)
76+
}
77+
78+
// we have a custom endpoint
79+
// in some cases the server-side checksum verification is not supported like in the case of cloudflare r2
80+
if endpoint != "" {
81+
cfg.Endpoint = aws.String(endpoint)
82+
cfg.S3ForcePathStyle = aws.Bool(true)
7883
}
7984

8085
session, err := session.NewSession(cfg)
@@ -83,11 +88,55 @@ func NewBackend(creds *Credentials, connOpts ...ConnOpt) (*Backend, error) {
8388
}
8489

8590
return &Backend{
86-
client: s3.New(session),
87-
bucket: creds.BucketName,
91+
client: s3.New(session),
92+
bucket: bucket,
93+
customEndpoint: endpoint,
8894
}, nil
8995
}
9096

97+
// For now we are aware that the checksum verification is not supported by cloudflare r2
98+
// https://developers.cloudflare.com/r2/api/s3/api/
99+
func (b *Backend) checksumVerificationEnabled() bool {
100+
var enabled = true
101+
if b.customEndpoint != "" && strings.Contains(b.customEndpoint, "r2.cloudflarestorage.com") {
102+
enabled = false
103+
}
104+
105+
return enabled
106+
}
107+
108+
// Extract the custom endpoint and the bucket name from the location string
109+
// The location string can be either a bucket name or a URL
110+
// i.e bucket-name or https://custom-domain/bucket-name
111+
func extractLocationAndBucket(creds *Credentials) (string, string, error) {
112+
// Older versions of the credentials didn't have the location field
113+
// and just the bucket name was stored in the bucket name field
114+
if creds.BucketName != "" {
115+
return "", creds.BucketName, nil
116+
}
117+
118+
// Newer versions of the credentials have the location field which can contain the endpoint
119+
// so we override the bucket and set the endpoint if needed
120+
parsedLocation, err := url.Parse(creds.Location)
121+
if err != nil {
122+
return "", "", fmt.Errorf("failed to parse location: %w", err)
123+
}
124+
125+
host := parsedLocation.Host
126+
// It's a bucket name
127+
if host == "" {
128+
return "", creds.Location, nil
129+
}
130+
131+
endpoint := fmt.Sprintf("%s://%s", parsedLocation.Scheme, host)
132+
// It's a URL, extract bucket name from the path
133+
if pathSegments := strings.Split(parsedLocation.Path, "/"); len(pathSegments) > 1 {
134+
return endpoint, pathSegments[1], nil
135+
}
136+
137+
return "", "", fmt.Errorf("the location doesn't contain a bucket name")
138+
}
139+
91140
// Exists check that the artifact is already present in the repository
92141
func (b *Backend) Exists(ctx context.Context, digest string) (bool, error) {
93142
_, err := b.Describe(ctx, digest)
@@ -100,29 +149,41 @@ func (b *Backend) Exists(ctx context.Context, digest string) (bool, error) {
100149

101150
func (b *Backend) Upload(ctx context.Context, r io.Reader, resource *pb.CASResource) error {
102151
uploader := s3manager.NewUploaderWithClient(b.client)
103-
104-
_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
152+
input := &s3manager.UploadInput{
105153
Bucket: aws.String(b.bucket),
106154
Key: aws.String(resourceName(resource.Digest)),
107155
Body: r,
108-
// Check that the object is uploaded correctly
109-
ChecksumSHA256: aws.String(hexSha256ToBinaryB64(resource.Digest)),
110156
Metadata: map[string]*string{
111157
annotationNameAuthor: aws.String(backend.AuthorAnnotation),
112158
annotationNameFilename: aws.String(resource.FileName),
113159
},
114-
})
160+
}
115161

116-
return err
162+
if b.checksumVerificationEnabled() {
163+
// Check that the object is uploaded correctly
164+
input.ChecksumSHA256 = aws.String(hexSha256ToBinaryB64(resource.Digest))
165+
}
166+
167+
if _, err := uploader.UploadWithContext(ctx, input); err != nil {
168+
return fmt.Errorf("failed to upload to bucket: %w", err)
169+
}
170+
171+
return nil
117172
}
118173

119174
func (b *Backend) Describe(ctx context.Context, digest string) (*pb.CASResource, error) {
120-
// and read the object back + validate integrity
121-
resp, err := b.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
122-
Bucket: aws.String(b.bucket),
123-
Key: aws.String(resourceName(digest)),
124-
ChecksumMode: aws.String("ENABLED"),
125-
})
175+
input := &s3.HeadObjectInput{
176+
Bucket: aws.String(b.bucket),
177+
Key: aws.String(resourceName(digest)),
178+
}
179+
180+
if b.checksumVerificationEnabled() {
181+
// Enable checksum verification
182+
input.ChecksumMode = aws.String("ENABLED")
183+
}
184+
185+
// and read the object back
186+
resp, err := b.client.HeadObjectWithContext(ctx, input)
126187

127188
// check error is aws error
128189
var awsErr awserr.Error

pkg/blobmanager/s3/backend_test.go

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//
2-
// Copyright 2023 The Chainloop Authors.
2+
// Copyright 2024 The Chainloop Authors.
33
//
44
// Licensed under the Apache License, Version 2.0 (the "License");
55
// you may not use this file except in compliance with the License.
@@ -129,6 +129,101 @@ func (s *testSuite) TestDescribe() {
129129
s.Equal(int64(4), artifact.Size)
130130
})
131131
}
132+
func (s *testSuite) TestChecksumVerificationEnabled() {
133+
testCases := []struct {
134+
name string
135+
customEndpoint string
136+
expected bool
137+
}{
138+
{
139+
name: "no endpoint, a.k.a AWS",
140+
customEndpoint: "",
141+
expected: true,
142+
},
143+
{
144+
name: "custom endpoint, i.e minio",
145+
customEndpoint: s.minio.ConnectionString(s.T()),
146+
expected: true,
147+
},
148+
{
149+
name: "custom endpoint",
150+
customEndpoint: "https://123.r2.cloudflarestorage.com/bucket-name",
151+
expected: false,
152+
},
153+
}
154+
155+
for _, tc := range testCases {
156+
s.Run(tc.name, func() {
157+
b := &Backend{customEndpoint: tc.customEndpoint}
158+
s.Equal(tc.expected, b.checksumVerificationEnabled())
159+
})
160+
}
161+
}
162+
163+
func (s *testSuite) TestExtractLocationAndBucket() {
164+
type expected struct {
165+
endpoint string
166+
bucket string
167+
err string
168+
}
169+
170+
testCases := []struct {
171+
name string
172+
creds *Credentials
173+
expected *expected
174+
}{
175+
{
176+
name: "no location",
177+
creds: &Credentials{
178+
BucketName: "bucket",
179+
},
180+
expected: &expected{
181+
bucket: "bucket",
182+
},
183+
},
184+
{
185+
name: "location is a bucket name",
186+
creds: &Credentials{
187+
Location: "bucket",
188+
},
189+
expected: &expected{
190+
bucket: "bucket",
191+
},
192+
},
193+
{
194+
name: "location is a URL",
195+
creds: &Credentials{
196+
Location: "https://custom-domain/bucket",
197+
},
198+
expected: &expected{
199+
endpoint: "https://custom-domain",
200+
bucket: "bucket",
201+
},
202+
},
203+
{
204+
name: "invalid URL",
205+
creds: &Credentials{
206+
Location: "https://custom-domain",
207+
},
208+
expected: &expected{
209+
err: "doesn't contain a bucket name",
210+
},
211+
},
212+
}
213+
214+
for _, tc := range testCases {
215+
s.Run(tc.name, func() {
216+
endpoint, bucket, err := extractLocationAndBucket(tc.creds)
217+
if tc.expected.err != "" {
218+
s.ErrorContains(err, tc.expected.err)
219+
} else {
220+
s.NoError(err)
221+
s.Equal(tc.expected.endpoint, endpoint)
222+
s.Equal(tc.expected.bucket, bucket)
223+
}
224+
})
225+
}
226+
}
132227

133228
func (s *testSuite) TestDownload() {
134229
s.T().Run("exist but not uploaded by Chainloop", func(t *testing.T) {
@@ -183,14 +278,15 @@ const testBucket = "test-bucket"
183278

184279
func (s *testSuite) SetupTest() {
185280
s.minio = newMinioInstance(s.T())
281+
location := fmt.Sprintf("http://%s/%s", s.minio.ConnectionString(s.T()), testBucket)
186282

187283
// Create backend
188284
backend, err := NewBackend(&Credentials{
189285
AccessKeyID: "root",
190286
SecretAccessKey: "test-password",
191287
Region: "us-east-1",
192-
BucketName: testBucket,
193-
}, WithEndpoint(fmt.Sprintf("http://%s", s.minio.ConnectionString(s.T()))), WithForcedS3PathStyle(true))
288+
Location: location,
289+
})
194290
require.NoError(s.T(), err)
195291
s.backend = backend
196292

@@ -199,7 +295,9 @@ func (s *testSuite) SetupTest() {
199295
SecretAccessKey: "wrong-password",
200296
Region: "us-east-1",
201297
BucketName: testBucket,
202-
}, WithEndpoint(fmt.Sprintf("http://%s", s.minio.ConnectionString(s.T()))), WithForcedS3PathStyle(true))
298+
Location: location,
299+
})
300+
203301
require.NoError(s.T(), err)
204302
s.invalidBackend = invalidBackend
205303

0 commit comments

Comments
 (0)