diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index cce67bd3da..df2bc4a6d2 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -23,12 +23,13 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" - "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" - "github.com/vmware/vmware-go-kcl/clientlibrary/worker" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" "github.com/dapr/components-contrib/bindings" awsAuth "github.com/dapr/components-contrib/common/authentication/aws" @@ -44,11 +45,12 @@ type AWSKinesis struct { worker *worker.Worker - streamName string - consumerName string - consumerARN *string - logger logger.Logger - consumerMode string + streamName string + consumerName string + consumerARN *string + logger logger.Logger + consumerMode string + applicationName string closed atomic.Bool closeCh chan struct{} @@ -65,6 +67,7 @@ type kinesisMetadata struct { SecretKey string `json:"secretKey" mapstructure:"secretKey"` SessionToken string `json:"sessionToken" mapstructure:"sessionToken"` KinesisConsumerMode string `json:"mode" mapstructure:"mode"` + ApplicationName string `json:"applicationName" mapstructure:"applicationName"` } const ( @@ -117,6 +120,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error a.streamName = m.StreamName a.consumerName = m.ConsumerName a.metadata = m + a.applicationName = m.ApplicationName opts := awsAuth.Options{ Logger: a.logger, @@ -158,19 +162,21 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er return errors.New("binding is closed") } - if a.metadata.KinesisConsumerMode == SharedThroughput { + switch a.metadata.KinesisConsumerMode { + case SharedThroughput: + // initalize worker configuration + config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) // Configure the KCL worker with custom endpoints for LocalStack - config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode) if a.metadata.Endpoint != "" { - config.KinesisEndpoint = a.metadata.Endpoint - config.DynamoDBEndpoint = a.metadata.Endpoint + config.WithKinesisEndpoint(a.metadata.Endpoint) + config.WithDynamoDBEndpoint(a.metadata.Endpoint) } a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config) err = a.worker.Start() if err != nil { return err } - } else if a.metadata.KinesisConsumerMode == ExtendedFanout { + case ExtendedFanout: var stream *kinesis.DescribeStreamOutput stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) if err != nil { @@ -182,10 +188,18 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er } } - stream, err := a.authProvider.Kinesis().Stream(ctx, a.streamName) - if err != nil { - return fmt.Errorf("failed to get kinesis stream arn: %v", err) + var stream *string + /** + * Invoke this only when KinesisConsumerMode is set to 'extended' to avoid unnecessary calls. + */ + if a.metadata.KinesisConsumerMode == ExtendedFanout { + streamARN, err := a.authProvider.Kinesis().Stream(ctx, a.streamName) + if err != nil { + return fmt.Errorf("failed to get kinesis stream arn: %v", err) + } + stream = streamARN } + // Wait for context cancelation then stop a.wg.Add(1) go func() { diff --git a/bindings/aws/kinesis/kinesis_test.go b/bindings/aws/kinesis/kinesis_test.go index aaca0c3c0a..154e9baa57 100644 --- a/bindings/aws/kinesis/kinesis_test.go +++ b/bindings/aws/kinesis/kinesis_test.go @@ -25,14 +25,15 @@ import ( func TestParseMetadata(t *testing.T) { m := bindings.Metadata{} m.Properties = map[string]string{ - "accessKey": "key", - "region": "region", - "secretKey": "secret", - "consumerName": "test", - "streamName": "stream", - "mode": "extended", - "endpoint": "endpoint", - "sessionToken": "token", + "accessKey": "key", + "region": "region", + "secretKey": "secret", + "consumerName": "test", + "streamName": "stream", + "mode": "extended", + "endpoint": "endpoint", + "sessionToken": "token", + "applicationName": "applicationName", } kinesis := AWSKinesis{} meta, err := kinesis.parseMetadata(m) @@ -45,4 +46,5 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "endpoint", meta.Endpoint) assert.Equal(t, "token", meta.SessionToken) assert.Equal(t, "extended", meta.KinesisConsumerMode) + assert.Equal(t, "applicationName", meta.ApplicationName) } diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index b210e32944..75b4b179db 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -18,8 +18,8 @@ import ( "errors" "sync" + aws2 "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" @@ -36,7 +36,7 @@ import ( "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" "github.com/aws/aws-sdk-go/service/sts" - "github.com/vmware/vmware-go-kcl/clientlibrary/config" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" ) type Clients struct { @@ -118,7 +118,7 @@ type ParameterStoreClients struct { type KinesisClients struct { Kinesis kinesisiface.KinesisAPI Region string - Credentials *credentials.Credentials + Credentials aws2.CredentialsProvider } type SesClients struct { @@ -173,7 +173,6 @@ func (c *ParameterStoreClients) New(session *session.Session) { func (c *KinesisClients) New(session *session.Session) { c.Kinesis = kinesis.New(session, session.Config) c.Region = *session.Config.Region - c.Credentials = session.Config.Credentials } func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { @@ -181,27 +180,27 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ StreamName: aws.String(streamName), }) - if stream != nil { - return stream.StreamDescription.StreamARN, err + /** + * If the error is not nil, do not proceed to the next step + * as it may cause a nil pointer error on stream.StreamDescription.StreamARN. + */if err != nil { + return nil, err } + return stream.StreamDescription.StreamARN, err } return nil, errors.New("unable to get stream arn due to empty client") } -func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration { +func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { const sharedMode = "shared" if c.Kinesis != nil { if mode == sharedMode { - if c.Credentials != nil { - kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer, - stream, c.Region, consumer, - c.Credentials) - return kclConfig - } + kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", nil) + return kclConfig + } } - return nil } diff --git a/common/authentication/aws/client_test.go b/common/authentication/aws/client_test.go index 85e0392aae..628fbb320a 100644 --- a/common/authentication/aws/client_test.go +++ b/common/authentication/aws/client_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" @@ -28,7 +29,7 @@ import ( "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vmware/vmware-go-kcl/clientlibrary/config" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" ) type mockedSQS struct { @@ -186,12 +187,13 @@ func TestKinesisClients_Stream(t *testing.T) { func TestKinesisClients_WorkerCfg(t *testing.T) { testCreds := credentials.NewStaticCredentials("accessKey", "secretKey", "") tests := []struct { - name string - kinesisClient *KinesisClients - streamName string - consumer string - mode string - expectedConfig *config.KinesisClientLibConfiguration + name string + kinesisClient *KinesisClients + streamName string + consumer string + applicationName string + mode string + expectedConfig *config.KinesisClientLibConfiguration }{ { name: "successfully creates shared mode worker config", @@ -208,9 +210,10 @@ func TestKinesisClients_WorkerCfg(t *testing.T) { Region: "us-west-1", Credentials: testCreds, }, - streamName: "existing-stream", - consumer: "consumer1", - mode: "shared", + streamName: "existing-stream", + applicationName: "test-application", + consumer: "consumer1", + mode: "shared", expectedConfig: config.NewKinesisClientLibConfigWithCredential( "consumer1", "existing-stream", "us-west-1", "consumer1", testCreds, ), @@ -230,10 +233,11 @@ func TestKinesisClients_WorkerCfg(t *testing.T) { Region: "us-west-1", Credentials: testCreds, }, - streamName: "existing-stream", - consumer: "consumer1", - mode: "exclusive", - expectedConfig: nil, + streamName: "existing-stream", + applicationName: "test-application", + consumer: "consumer1", + mode: "exclusive", + expectedConfig: nil, }, { name: "returns nil when client is nil", @@ -242,16 +246,17 @@ func TestKinesisClients_WorkerCfg(t *testing.T) { Region: "us-west-1", Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), }, - streamName: "existing-stream", - consumer: "consumer1", - mode: "shared", - expectedConfig: nil, + streamName: "existing-stream", + applicationName: "test-application", + consumer: "consumer1", + mode: "shared", + expectedConfig: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.consumer, tt.mode) + cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.mode, tt.applicationName) if tt.expectedConfig == nil { assert.Equal(t, tt.expectedConfig, cfg) return diff --git a/go.mod b/go.mod index 9d61f17c01..5020be5d0a 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/apache/thrift v0.13.0 github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a github.com/aws/aws-sdk-go v1.55.6 - github.com/aws/aws-sdk-go-v2 v1.36.5 + github.com/aws/aws-sdk-go-v2 v1.37.2 github.com/aws/aws-sdk-go-v2/config v1.29.17 github.com/aws/aws-sdk-go-v2/credentials v1.17.70 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.19.3 @@ -123,7 +123,7 @@ require ( github.com/tetratelabs/wazero v1.7.0 github.com/tmc/langchaingo v0.1.13 github.com/valyala/fasthttp v1.53.0 - github.com/vmware/vmware-go-kcl v1.5.1 + github.com/vmware/vmware-go-kcl-v2 v1.0.0 github.com/xdg-go/scram v1.1.2 go.etcd.io/etcd/client/v3 v3.5.21 go.mongodb.org/mongo-driver v1.14.0 @@ -203,10 +203,11 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 // indirect + github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect github.com/aws/smithy-go v1.22.5 // indirect - github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect + github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect diff --git a/go.sum b/go.sum index fb3f11c723..ac1ef82d74 100644 --- a/go.sum +++ b/go.sum @@ -274,15 +274,15 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a h1:QFemvMGPnajaeRBkFc1HoEA7qzVjUv+rkYb1/ps1/UE= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI= -github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk= github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0= -github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0= +github.com/aws/aws-sdk-go-v2 v1.37.2 h1:xkW1iMYawzcmYFYEV0UCMxc8gSsjCGEhBXQkdQywVbo= +github.com/aws/aws-sdk-go-v2 v1.37.2/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 h1:12SpdwU8Djs+YGklkinSSlcrPyj3H4VifVsKf78KbwA= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11/go.mod h1:dd+Lkp6YmMryke+qxW/VnKyhMBDTYP41Q2Bb+6gNZgY= github.com/aws/aws-sdk-go-v2/config v1.8.3/go.mod h1:4AEiLtAb8kLs7vgw2ZV3p2VZ1+hBavOc84hqxVNpCyw= @@ -319,6 +319,9 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17/go.mod github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72HRZDLMtmVQiLG2tLfQcaWLCssELvGl+Zf2WVxMmR8= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY= github.com/aws/aws-sdk-go-v2/service/sns v1.34.7 h1:OBuZE9Wt8h2imuRktu+WfjiTGrnYdCIJg8IX92aalHE= github.com/aws/aws-sdk-go-v2/service/sns v1.34.7/go.mod h1:4WYoZAhHt+dWYpoOQUgkUKfuQbE6Gg/hW4oXE0pKS9U= github.com/aws/aws-sdk-go-v2/service/sqs v1.38.8 h1:80dpSqWMwx2dAm30Ib7J6ucz1ZHfiv5OCRwN/EnCOXQ= @@ -336,8 +339,8 @@ github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZU github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -1718,8 +1721,8 @@ github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4 github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= -github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= +github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs= +github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=