Skip to content

Commit be15d17

Browse files
authored
Merge branch 'main' into 3318-RavenDB-state-store-new
2 parents 5c74fe5 + 1fdd753 commit be15d17

File tree

43 files changed

+1214
-503
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1214
-503
lines changed

.build-tools/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/dapr/components-contrib/build-tools
22

3-
go 1.24.1
3+
go 1.24.4
44

55
require (
66
github.com/dapr/components-contrib v0.0.0

.github/infrastructure/docker-compose-coherence.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ services:
44
environment:
55
- coherence.management.http=all
66
- coherence.management.http.port=30000
7-
- Dcoherence.health.http.port=6676
7+
- coherence.health.http.port=6676
88
- coherence.wka=127.0.0.1
99
ports:
1010
- 30000:30000

bindings/aws/s3/s3.go

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,20 @@ import (
2424
"net/http"
2525
"os"
2626
"reflect"
27+
"slices"
2728
"strings"
2829
"time"
2930

31+
"github.com/aws/aws-sdk-go-v2/service/s3/types"
3032
"github.com/aws/aws-sdk-go/aws"
31-
"github.com/aws/aws-sdk-go/aws/awserr"
32-
"github.com/aws/aws-sdk-go/service/s3"
33-
"github.com/aws/aws-sdk-go/service/s3/s3manager"
33+
awsCommon "github.com/dapr/components-contrib/common/aws"
34+
awsCommonAuth "github.com/dapr/components-contrib/common/aws/auth"
35+
36+
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
37+
"github.com/aws/aws-sdk-go-v2/service/s3"
3438
"github.com/google/uuid"
3539

3640
"github.com/dapr/components-contrib/bindings"
37-
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
3841
commonutils "github.com/dapr/components-contrib/common/utils"
3942
"github.com/dapr/components-contrib/metadata"
4043
"github.com/dapr/kit/logger"
@@ -60,9 +63,12 @@ const (
6063

6164
// AWSS3 is a binding for an AWS S3 storage bucket.
6265
type AWSS3 struct {
63-
metadata *s3Metadata
64-
authProvider awsAuth.Provider
65-
logger logger.Logger
66+
metadata *s3Metadata
67+
logger logger.Logger
68+
s3Client *s3.Client
69+
s3Uploader *manager.Uploader
70+
s3Downloader *manager.Downloader
71+
s3PresignClient *s3.PresignClient
6672
}
6773

6874
type s3Metadata struct {
@@ -106,10 +112,42 @@ func NewAWSS3(logger logger.Logger) bindings.OutputBinding {
106112
return &AWSS3{logger: logger}
107113
}
108114

109-
func (s *AWSS3) getAWSConfig(opts awsAuth.Options) *aws.Config {
110-
cfg := awsAuth.GetConfig(opts).WithS3ForcePathStyle(s.metadata.ForcePathStyle).WithDisableSSL(s.metadata.DisableSSL)
115+
// Init does metadata parsing and connection creation.
116+
func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
117+
m, err := s.parseMetadata(metadata)
118+
if err != nil {
119+
return err
120+
}
121+
s.metadata = m
122+
123+
authOpts := awsCommonAuth.Options{
124+
Logger: s.logger,
125+
126+
Properties: metadata.Properties,
127+
128+
Region: m.Region,
129+
Endpoint: m.Endpoint,
130+
AccessKey: m.AccessKey,
131+
SecretKey: m.SecretKey,
132+
SessionToken: m.SessionToken,
133+
}
134+
135+
var configOptions []awsCommon.ConfigOption
136+
137+
var s3Options []func(options *s3.Options)
138+
139+
if s.metadata.DisableSSL {
140+
s3Options = append(s3Options, func(options *s3.Options) {
141+
options.EndpointOptions.DisableHTTPS = true
142+
})
143+
}
144+
145+
if !s.metadata.ForcePathStyle {
146+
s3Options = append(s3Options, func(options *s3.Options) {
147+
options.UsePathStyle = true
148+
})
149+
}
111150

112-
// Use a custom HTTP client to allow self-signed certs
113151
if s.metadata.InsecureSSL {
114152
customTransport := http.DefaultTransport.(*http.Transport).Clone()
115153
customTransport.TLSClientConfig = &tls.Config{
@@ -119,44 +157,27 @@ func (s *AWSS3) getAWSConfig(opts awsAuth.Options) *aws.Config {
119157
client := &http.Client{
120158
Transport: customTransport,
121159
}
122-
cfg = cfg.WithHTTPClient(client)
160+
configOptions = append(configOptions, awsCommon.WithHTTPClient(client))
123161

124162
s.logger.Infof("aws s3: you are using 'insecureSSL' to skip server config verify which is unsafe!")
125163
}
126-
return cfg
127-
}
128164

129-
// Init does metadata parsing and connection creation.
130-
func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
131-
m, err := s.parseMetadata(metadata)
165+
awsConfig, err := awsCommon.NewConfig(ctx, authOpts, configOptions...)
132166
if err != nil {
133-
return err
167+
return fmt.Errorf("s3 binding error: failed to create AWS config: %w", err)
134168
}
135-
s.metadata = m
136169

137-
opts := awsAuth.Options{
138-
Logger: s.logger,
139-
Properties: metadata.Properties,
140-
Region: m.Region,
141-
Endpoint: m.Endpoint,
142-
AccessKey: m.AccessKey,
143-
SecretKey: m.SecretKey,
144-
SessionToken: m.SessionToken,
145-
}
146-
// extra configs needed per component type
147-
provider, err := awsAuth.NewProvider(ctx, opts, s.getAWSConfig(opts))
148-
if err != nil {
149-
return err
150-
}
151-
s.authProvider = provider
170+
s.s3Client = s3.NewFromConfig(awsConfig, s3Options...)
171+
172+
s.s3Uploader = manager.NewUploader(s.s3Client)
173+
s.s3Downloader = manager.NewDownloader(s.s3Client)
174+
175+
s.s3PresignClient = s3.NewPresignClient(s.s3Client)
152176

153177
return nil
154178
}
155179

156180
func (s *AWSS3) Close() error {
157-
if s.authProvider != nil {
158-
return s.authProvider.Close()
159-
}
160181
return nil
161182
}
162183

@@ -215,19 +236,25 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
215236
r = b64.NewDecoder(b64.StdEncoding, r)
216237
}
217238

218-
var storageClass *string
239+
var storageClass types.StorageClass
219240
if metadata.StorageClass != "" {
220-
storageClass = aws.String(metadata.StorageClass)
241+
// assert storageclass exists in the types.storageclass.values() slice
242+
storageClass = types.StorageClass(strings.ToUpper(metadata.StorageClass))
243+
if !slices.Contains(storageClass.Values(), storageClass) {
244+
return nil, fmt.Errorf("s3 binding error: invalid storage class '%s' provided", metadata.StorageClass)
245+
}
221246
}
222247

223-
resultUpload, err := s.authProvider.S3().Uploader.UploadWithContext(ctx, &s3manager.UploadInput{
248+
s3UploaderPutObjectInput := &s3.PutObjectInput{
224249
Bucket: ptr.Of(metadata.Bucket),
225250
Key: ptr.Of(key),
226251
Body: r,
227252
ContentType: contentType,
228253
StorageClass: storageClass,
229254
Tagging: tagging,
230-
})
255+
}
256+
257+
resultUpload, err := s.s3Uploader.Upload(ctx, s3UploaderPutObjectInput)
231258
if err != nil {
232259
return nil, fmt.Errorf("s3 binding error: uploading failed: %w", err)
233260
}
@@ -296,16 +323,21 @@ func (s *AWSS3) presignObject(ctx context.Context, bucket, key, ttl string) (str
296323
if err != nil {
297324
return "", fmt.Errorf("s3 binding error: cannot parse duration %s: %w", ttl, err)
298325
}
299-
objReq, _ := s.authProvider.S3().S3.GetObjectRequest(&s3.GetObjectInput{
326+
s3GetObjectInput := &s3.GetObjectInput{
300327
Bucket: ptr.Of(bucket),
301328
Key: ptr.Of(key),
302-
})
303-
url, err := objReq.Presign(d)
329+
}
330+
331+
presignedObjectRequest, err := s.s3PresignClient.PresignGetObject(
332+
ctx,
333+
s3GetObjectInput,
334+
s3.WithPresignExpires(d),
335+
)
304336
if err != nil {
305337
return "", fmt.Errorf("s3 binding error: failed to presign URL: %w", err)
306338
}
307339

308-
return url, nil
340+
return presignedObjectRequest.URL, nil
309341
}
310342

311343
func (s *AWSS3) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -320,16 +352,16 @@ func (s *AWSS3) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings
320352
}
321353

322354
buff := &aws.WriteAtBuffer{}
323-
_, err = s.authProvider.S3().Downloader.DownloadWithContext(ctx,
355+
_, err = s.s3Downloader.Download(ctx,
324356
buff,
325357
&s3.GetObjectInput{
326358
Bucket: ptr.Of(s.metadata.Bucket),
327359
Key: ptr.Of(key),
328360
},
329361
)
330362
if err != nil {
331-
var awsErr awserr.Error
332-
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey {
363+
var awsErr *types.NoSuchKey
364+
if errors.As(err, &awsErr) {
333365
return nil, errors.New("object not found")
334366
}
335367
return nil, fmt.Errorf("s3 binding error: error downloading S3 object: %w", err)
@@ -354,16 +386,16 @@ func (s *AWSS3) delete(ctx context.Context, req *bindings.InvokeRequest) (*bindi
354386
if key == "" {
355387
return nil, fmt.Errorf("s3 binding error: required metadata '%s' missing", metadataKey)
356388
}
357-
_, err := s.authProvider.S3().S3.DeleteObjectWithContext(
389+
_, err := s.s3Client.DeleteObject(
358390
ctx,
359391
&s3.DeleteObjectInput{
360392
Bucket: ptr.Of(s.metadata.Bucket),
361393
Key: ptr.Of(key),
362394
},
363395
)
364396
if err != nil {
365-
var awsErr awserr.Error
366-
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey {
397+
var awsErr *types.NoSuchKey
398+
if errors.As(err, &awsErr) {
367399
return nil, errors.New("object not found")
368400
}
369401
return nil, fmt.Errorf("s3 binding error: delete operation failed: %w", err)
@@ -383,9 +415,9 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding
383415
if payload.MaxResults < 1 {
384416
payload.MaxResults = defaultMaxResults
385417
}
386-
result, err := s.authProvider.S3().S3.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
418+
result, err := s.s3Client.ListObjects(ctx, &s3.ListObjectsInput{
387419
Bucket: ptr.Of(s.metadata.Bucket),
388-
MaxKeys: ptr.Of(int64(payload.MaxResults)),
420+
MaxKeys: ptr.Of(payload.MaxResults),
389421
Marker: ptr.Of(payload.Marker),
390422
Prefix: ptr.Of(payload.Prefix),
391423
Delimiter: ptr.Of(payload.Delimiter),

bindings/kafka/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,10 @@ metadata:
372372
- "range"
373373
- "sticky"
374374
- "roundrobin"
375+
- name: excludeHeaderMetaRegex
376+
type: string
377+
required: false
378+
description: |
379+
A regular expression to exclude keys from being converted to/from headers from/to metadata to avoid unwanted downstream side effects.
380+
example: '"^rawPayload|valueSchemaType$"'
381+
default: '""'

bindings/zeebe/command/metadata.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ metadata:
4646
type: string
4747
- name: gatewayKeepAlive
4848
required: false
49-
description: Sets how often keep alive messages should be sent to the gateway. Defaults to 45 seconds
49+
description: Sets how often keep alive messages should be sent to the gateway.
5050
example: "45s"
5151
type: duration
52+
default: "45s"
5253
- name: usePlainTextConnection
5354
required: false
5455
description: Whether to use a plain text connection or not

bindings/zeebe/jobworker/metadata.yaml

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ metadata:
2020
type: string
2121
- name: gatewayKeepAlive
2222
required: false
23-
description: Sets how often keep alive messages should be sent to the gateway. Defaults to 45 seconds
23+
description: Sets how often keep alive messages should be sent to the gateway.
2424
example: "45s"
2525
type: duration
26+
default: "45s"
2627
- name: usePlainTextConnection
2728
required: false
2829
description: Whether to use a plain text connection or not
@@ -40,39 +41,44 @@ metadata:
4041
type: string
4142
- name: workerTimeout
4243
required: false
43-
description: A job returned after this call will not be activated by another call until the timeout has been reached; defaults to 5 minutes
44+
description: A job returned after this call will not be activated by another call until the timeout has been reached.
4445
example: "5m"
4546
type: duration
4647
- name: requestTimeout
4748
required: false
4849
description: The request will be completed when at least one job is activated or after the requestTimeout. If the requestTimeout = 0, a default timeout is used. If the requestTimeout < 0, long polling is disabled and the request is completed immediately, even when no job is activated. Defaults to 10 seconds
4950
example: "30s"
5051
type: duration
52+
default: "10s"
5153
- name: jobType
5254
required: true
5355
description: the job type, as defined in the BPMN process (e.g. <zeebe:taskDefinition type=\"fetch-products\" />)
5456
example: "fetch-products"
5557
type: string
5658
- name: maxJobsActive
5759
required: false
58-
description: Set the maximum number of jobs which will be activated for this worker at the same time. Defaults to 32
60+
description: Set the maximum number of jobs which will be activated for this worker at the same time.
5961
example: "32"
6062
type: number
63+
default: "32"
6164
- name: concurrency
6265
required: false
63-
description: The maximum number of concurrent spawned goroutines to complete jobs. Defaults to 4
66+
description: The maximum number of concurrent spawned goroutines to complete jobs.
6467
example: "4"
68+
default: "4"
6569
type: number
6670
- name: pollInterval
6771
required: false
68-
description: Set the maximal interval between polling for new jobs. Defaults to 100 milliseconds
72+
description: Set the maximal interval between polling for new jobs.
6973
example: "100ms"
7074
type: duration
75+
default: "100ms"
7176
- name: pollThreshold
7277
required: false
73-
description: Set the threshold of buffered activated jobs before polling for new jobs, i.e. threshold * maxJobsActive. Defaults to 0.3
78+
description: Set the threshold of buffered activated jobs before polling for new jobs, i.e. threshold * maxJobsActive.
7479
example: "0.3"
7580
type: number
81+
default: "0.3"
7682
- name: fetchVariables
7783
required: false
7884
description: A list of variables to fetch as the job variables; if empty, all visible variables at the time of activation for the scope of the job will be returned

0 commit comments

Comments
 (0)