Skip to content

Commit b5c9503

Browse files
authored
Merge branch 'main' into dapr-state-store-clickhouse
2 parents d544f89 + 61cdce8 commit b5c9503

File tree

28 files changed

+1075
-403
lines changed

28 files changed

+1075
-403
lines changed

.build-tools/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/dapr/components-contrib/build-tools
22

3-
go 1.24.1
3+
go 1.24.4
44

55
require (
66
github.com/dapr/components-contrib v0.0.0

bindings/aws/s3/s3.go

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,20 @@ import (
2424
"net/http"
2525
"os"
2626
"reflect"
27+
"slices"
2728
"strings"
2829
"time"
2930

31+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
3032
"github.com/aws/aws-sdk-go/aws"
31-
"github.com/aws/aws-sdk-go/aws/awserr"
32-
"github.com/aws/aws-sdk-go/service/s3"
33-
"github.com/aws/aws-sdk-go/service/s3/s3manager"
33+
awsCommon "github.com/dapr/components-contrib/common/aws"
34+
awsCommonAuth "github.com/dapr/components-contrib/common/aws/auth"
35+
36+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
37+
"github.com/aws/aws-sdk-go-v2/service/s3"
3438
"github.com/google/uuid"
3539

3640
"github.com/dapr/components-contrib/bindings"
37-
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
3841
commonutils "github.com/dapr/components-contrib/common/utils"
3942
"github.com/dapr/components-contrib/metadata"
4043
"github.com/dapr/kit/logger"
@@ -60,9 +63,12 @@ const (
6063

6164
// AWSS3 is a binding for an AWS S3 storage bucket.
6265
type AWSS3 struct {
63-
metadata *s3Metadata
64-
authProvider awsAuth.Provider
65-
logger logger.Logger
66+
metadata *s3Metadata
67+
logger logger.Logger
68+
s3Client *s3.Client
69+
s3Uploader *manager.Uploader
70+
s3Downloader *manager.Downloader
71+
s3PresignClient *s3.PresignClient
6672
}
6773

6874
type s3Metadata struct {
@@ -106,10 +112,42 @@ func NewAWSS3(logger logger.Logger) bindings.OutputBinding {
106112
return &AWSS3{logger: logger}
107113
}
108114

109-
func (s *AWSS3) getAWSConfig(opts awsAuth.Options) *aws.Config {
110-
cfg := awsAuth.GetConfig(opts).WithS3ForcePathStyle(s.metadata.ForcePathStyle).WithDisableSSL(s.metadata.DisableSSL)
115+
// Init does metadata parsing and connection creation.
116+
func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
117+
m, err := s.parseMetadata(metadata)
118+
if err != nil {
119+
return err
120+
}
121+
s.metadata = m
122+
123+
authOpts := awsCommonAuth.Options{
124+
Logger: s.logger,
125+
126+
Properties: metadata.Properties,
127+
128+
Region: m.Region,
129+
Endpoint: m.Endpoint,
130+
AccessKey: m.AccessKey,
131+
SecretKey: m.SecretKey,
132+
SessionToken: m.SessionToken,
133+
}
134+
135+
var configOptions []awsCommon.ConfigOption
136+
137+
var s3Options []func(options *s3.Options)
138+
139+
if s.metadata.DisableSSL {
140+
s3Options = append(s3Options, func(options *s3.Options) {
141+
options.EndpointOptions.DisableHTTPS = true
142+
})
143+
}
144+
145+
if !s.metadata.ForcePathStyle {
146+
s3Options = append(s3Options, func(options *s3.Options) {
147+
options.UsePathStyle = true
148+
})
149+
}
111150

112-
// Use a custom HTTP client to allow self-signed certs
113151
if s.metadata.InsecureSSL {
114152
customTransport := http.DefaultTransport.(*http.Transport).Clone()
115153
customTransport.TLSClientConfig = &tls.Config{
@@ -119,44 +157,27 @@ func (s *AWSS3) getAWSConfig(opts awsAuth.Options) *aws.Config {
119157
client := &http.Client{
120158
Transport: customTransport,
121159
}
122-
cfg = cfg.WithHTTPClient(client)
160+
configOptions = append(configOptions, awsCommon.WithHTTPClient(client))
123161

124162
s.logger.Infof("aws s3: you are using 'insecureSSL' to skip server config verify which is unsafe!")
125163
}
126-
return cfg
127-
}
128164

129-
// Init does metadata parsing and connection creation.
130-
func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
131-
m, err := s.parseMetadata(metadata)
165+
awsConfig, err := awsCommon.NewConfig(ctx, authOpts, configOptions...)
132166
if err != nil {
133-
return err
167+
return fmt.Errorf("s3 binding error: failed to create AWS config: %w", err)
134168
}
135-
s.metadata = m
136169

137-
opts := awsAuth.Options{
138-
Logger: s.logger,
139-
Properties: metadata.Properties,
140-
Region: m.Region,
141-
Endpoint: m.Endpoint,
142-
AccessKey: m.AccessKey,
143-
SecretKey: m.SecretKey,
144-
SessionToken: m.SessionToken,
145-
}
146-
// extra configs needed per component type
147-
provider, err := awsAuth.NewProvider(ctx, opts, s.getAWSConfig(opts))
148-
if err != nil {
149-
return err
150-
}
151-
s.authProvider = provider
170+
s.s3Client = s3.NewFromConfig(awsConfig, s3Options...)
171+
172+
s.s3Uploader = manager.NewUploader(s.s3Client)
173+
s.s3Downloader = manager.NewDownloader(s.s3Client)
174+
175+
s.s3PresignClient = s3.NewPresignClient(s.s3Client)
152176

153177
return nil
154178
}
155179

156180
func (s *AWSS3) Close() error {
157-
if s.authProvider != nil {
158-
return s.authProvider.Close()
159-
}
160181
return nil
161182
}
162183

@@ -215,19 +236,25 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
215236
r = b64.NewDecoder(b64.StdEncoding, r)
216237
}
217238

218-
var storageClass *string
239+
var storageClass types.StorageClass
219240
if metadata.StorageClass != "" {
220-
storageClass = aws.String(metadata.StorageClass)
241+
// assert storageclass exists in the types.storageclass.values() slice
242+
storageClass = types.StorageClass(strings.ToUpper(metadata.StorageClass))
243+
if !slices.Contains(storageClass.Values(), storageClass) {
244+
return nil, fmt.Errorf("s3 binding error: invalid storage class '%s' provided", metadata.StorageClass)
245+
}
221246
}
222247

223-
resultUpload, err := s.authProvider.S3().Uploader.UploadWithContext(ctx, &s3manager.UploadInput{
248+
s3UploaderPutObjectInput := &s3.PutObjectInput{
224249
Bucket: ptr.Of(metadata.Bucket),
225250
Key: ptr.Of(key),
226251
Body: r,
227252
ContentType: contentType,
228253
StorageClass: storageClass,
229254
Tagging: tagging,
230-
})
255+
}
256+
257+
resultUpload, err := s.s3Uploader.Upload(ctx, s3UploaderPutObjectInput)
231258
if err != nil {
232259
return nil, fmt.Errorf("s3 binding error: uploading failed: %w", err)
233260
}
@@ -296,16 +323,21 @@ func (s *AWSS3) presignObject(ctx context.Context, bucket, key, ttl string) (str
296323
if err != nil {
297324
return "", fmt.Errorf("s3 binding error: cannot parse duration %s: %w", ttl, err)
298325
}
299-
objReq, _ := s.authProvider.S3().S3.GetObjectRequest(&s3.GetObjectInput{
326+
s3GetObjectInput := &s3.GetObjectInput{
300327
Bucket: ptr.Of(bucket),
301328
Key: ptr.Of(key),
302-
})
303-
url, err := objReq.Presign(d)
329+
}
330+
331+
presignedObjectRequest, err := s.s3PresignClient.PresignGetObject(
332+
ctx,
333+
s3GetObjectInput,
334+
s3.WithPresignExpires(d),
335+
)
304336
if err != nil {
305337
return "", fmt.Errorf("s3 binding error: failed to presign URL: %w", err)
306338
}
307339

308-
return url, nil
340+
return presignedObjectRequest.URL, nil
309341
}
310342

311343
func (s *AWSS3) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -320,16 +352,16 @@ func (s *AWSS3) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings
320352
}
321353

322354
buff := &aws.WriteAtBuffer{}
323-
_, err = s.authProvider.S3().Downloader.DownloadWithContext(ctx,
355+
_, err = s.s3Downloader.Download(ctx,
324356
buff,
325357
&s3.GetObjectInput{
326358
Bucket: ptr.Of(s.metadata.Bucket),
327359
Key: ptr.Of(key),
328360
},
329361
)
330362
if err != nil {
331-
var awsErr awserr.Error
332-
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey {
363+
var awsErr *types.NoSuchKey
364+
if errors.As(err, &awsErr) {
333365
return nil, errors.New("object not found")
334366
}
335367
return nil, fmt.Errorf("s3 binding error: error downloading S3 object: %w", err)
@@ -354,16 +386,16 @@ func (s *AWSS3) delete(ctx context.Context, req *bindings.InvokeRequest) (*bindi
354386
if key == "" {
355387
return nil, fmt.Errorf("s3 binding error: required metadata '%s' missing", metadataKey)
356388
}
357-
_, err := s.authProvider.S3().S3.DeleteObjectWithContext(
389+
_, err := s.s3Client.DeleteObject(
358390
ctx,
359391
&s3.DeleteObjectInput{
360392
Bucket: ptr.Of(s.metadata.Bucket),
361393
Key: ptr.Of(key),
362394
},
363395
)
364396
if err != nil {
365-
var awsErr awserr.Error
366-
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey {
397+
var awsErr *types.NoSuchKey
398+
if errors.As(err, &awsErr) {
367399
return nil, errors.New("object not found")
368400
}
369401
return nil, fmt.Errorf("s3 binding error: delete operation failed: %w", err)
@@ -383,9 +415,9 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding
383415
if payload.MaxResults < 1 {
384416
payload.MaxResults = defaultMaxResults
385417
}
386-
result, err := s.authProvider.S3().S3.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
418+
result, err := s.s3Client.ListObjects(ctx, &s3.ListObjectsInput{
387419
Bucket: ptr.Of(s.metadata.Bucket),
388-
MaxKeys: ptr.Of(int64(payload.MaxResults)),
420+
MaxKeys: ptr.Of(payload.MaxResults),
389421
Marker: ptr.Of(payload.Marker),
390422
Prefix: ptr.Of(payload.Prefix),
391423
Delimiter: ptr.Of(payload.Delimiter),

bindings/kafka/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,10 @@ metadata:
372372
- "range"
373373
- "sticky"
374374
- "roundrobin"
375+
- name: excludeHeaderMetaRegex
376+
type: string
377+
required: false
378+
description: |
379+
A regular expression to exclude keys from being converted to/from headers from/to metadata to avoid unwanted downstream side effects.
380+
example: '"^rawPayload|valueSchemaType$"'
381+
default: '""'

common/aws/auth/auth.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package auth
2+
3+
import (
4+
"context"
5+
6+
"github.com/aws/aws-sdk-go-v2/aws"
7+
"github.com/aws/aws-sdk-go-v2/config"
8+
9+
"github.com/dapr/kit/logger"
10+
)
11+
12+
type ProviderType int
13+
14+
const (
15+
StaticProviderTypeStatic ProviderType = iota
16+
StaticProviderTypeAssumeRole
17+
X509ProviderType
18+
ProviderTypeUnknown // Or default
19+
)
20+
21+
type Options struct {
22+
Logger logger.Logger
23+
Properties map[string]string
24+
25+
Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion"`
26+
AccessKey string `json:"accessKey" mapstructure:"accessKey"`
27+
SecretKey string `json:"secretKey" mapstructure:"secretKey"`
28+
SessionToken string `json:"sessionToken" mapstructure:"sessionToken"`
29+
AssumeRoleArn string `json:"assumeRoleArn" mapstructure:"assumeRoleArn"`
30+
TrustAnchorArn string `json:"trustAnchorArn" mapstructure:"trustAnchorArn"`
31+
TrustProfileArn string `json:"trustProfileArn" mapstructure:"trustProfileArn"`
32+
33+
Endpoint string `json:"endpoint" mapstructure:"endpoint"`
34+
}
35+
36+
// CredentialProvider provides an interface for retrieving AWS credentials.
37+
type CredentialProvider interface {
38+
Retrieve(ctx context.Context) (aws.Credentials, error)
39+
Type() ProviderType
40+
}
41+
42+
func NewCredentialProvider(ctx context.Context, opts Options, configOpts []func(*config.LoadOptions) error) (CredentialProvider,
43+
error,
44+
) {
45+
// TODO: Refactor this to search the opts structure for the right fields rather than the metadata map
46+
if isX509Auth(opts.Properties) {
47+
return newAuthX509(ctx, opts)
48+
}
49+
return newAuthStatic(ctx, opts, configOpts)
50+
}

common/aws/auth/auth_static.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package auth
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/aws/aws-sdk-go-v2/aws"
8+
"github.com/aws/aws-sdk-go-v2/config"
9+
"github.com/aws/aws-sdk-go-v2/credentials"
10+
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
11+
"github.com/aws/aws-sdk-go-v2/service/sts"
12+
13+
"github.com/dapr/kit/logger"
14+
)
15+
16+
type Static struct {
17+
ProviderType ProviderType
18+
Logger logger.Logger
19+
AccessKey string
20+
SecretKey string
21+
SessionToken string
22+
Region string
23+
Endpoint string
24+
AssumeRoleArn string
25+
26+
CredentialProvider aws.CredentialsProvider
27+
}
28+
29+
func (a *Static) Retrieve(ctx context.Context) (aws.Credentials, error) {
30+
if a.CredentialProvider == nil {
31+
return aws.Credentials{}, errors.New("credential provider is not set")
32+
}
33+
return a.CredentialProvider.Retrieve(ctx)
34+
}
35+
36+
func (a *Static) Type() ProviderType {
37+
return a.ProviderType
38+
}
39+
40+
func newAuthStatic(ctx context.Context, opts Options, configOpts []func(*config.LoadOptions) error) (CredentialProvider, error) {
41+
static := &Static{
42+
Logger: opts.Logger,
43+
AccessKey: opts.AccessKey,
44+
SecretKey: opts.SecretKey,
45+
SessionToken: opts.SessionToken,
46+
Region: opts.Region,
47+
Endpoint: opts.Endpoint,
48+
AssumeRoleArn: opts.AssumeRoleArn,
49+
}
50+
51+
switch {
52+
case static.AccessKey != "" && static.SecretKey != "" && static.SessionToken != "":
53+
static.ProviderType = StaticProviderTypeStatic
54+
static.CredentialProvider = credentials.NewStaticCredentialsProvider(opts.AccessKey, opts.SecretKey,
55+
opts.SessionToken)
56+
static.Logger.Debug("using static credentials provider")
57+
58+
case static.AssumeRoleArn != "":
59+
awsCfg, err := config.LoadDefaultConfig(ctx, configOpts...)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
stsSvc := sts.NewFromConfig(awsCfg)
65+
stsProvider := stscreds.NewAssumeRoleProvider(stsSvc, static.AssumeRoleArn)
66+
static.ProviderType = StaticProviderTypeAssumeRole
67+
static.CredentialProvider = stsProvider
68+
static.Logger.Debug("using AssumeRole credentials provider")
69+
70+
default:
71+
static.Logger.Debug("using an undefined credentials provider, this may lead to unexpected behavior")
72+
static.ProviderType = ProviderTypeUnknown
73+
}
74+
75+
return static, nil
76+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package auth

0 commit comments

Comments
 (0)