@@ -27,7 +27,6 @@ import (
2727 "github.com/aws/aws-sdk-go/service/kinesis"
2828 "github.com/cenkalti/backoff/v4"
2929 "github.com/google/uuid"
30- "github.com/vmware/vmware-go-kcl/clientlibrary/config"
3130 "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
3231 "github.com/vmware/vmware-go-kcl/clientlibrary/worker"
3332
@@ -40,15 +39,16 @@ import (
4039
4140// AWSKinesis allows receiving and sending data to/from AWS Kinesis stream.
4241type AWSKinesis struct {
43- client * kinesis. Kinesis
44- metadata * kinesisMetadata
42+ authProvider awsAuth. Provider
43+ metadata * kinesisMetadata
4544
46- worker * worker.Worker
47- workerConfig * config.KinesisClientLibConfiguration
45+ worker * worker.Worker
4846
49- streamARN * string
50- consumerARN * string
51- logger logger.Logger
47+ streamName string
48+ consumerName string
49+ consumerARN * string
50+ logger logger.Logger
51+ consumerMode string
5252
5353 closed atomic.Bool
5454 closeCh chan struct {}
@@ -112,30 +112,25 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error
112112 return fmt .Errorf ("%s invalid \" mode\" field %s" , "aws.kinesis" , m .KinesisConsumerMode )
113113 }
114114
115- client , err := a . getClient ( m )
116- if err != nil {
117- return err
118- }
115+ a . consumerMode = m . KinesisConsumerMode
116+ a . streamName = m . StreamName
117+ a . consumerName = m . ConsumerName
118+ a . metadata = m
119119
120- streamName := aws .String (m .StreamName )
121- stream , err := client .DescribeStreamWithContext (ctx , & kinesis.DescribeStreamInput {
122- StreamName : streamName ,
123- })
120+ opts := awsAuth.Options {
121+ Logger : a .logger ,
122+ Properties : metadata .Properties ,
123+ Region : m .Region ,
124+ AccessKey : m .AccessKey ,
125+ SecretKey : m .SecretKey ,
126+ SessionToken : "" ,
127+ }
128+ // extra configs needed per component type
129+ provider , err := awsAuth .NewProvider (ctx , opts , awsAuth .GetConfig (opts ))
124130 if err != nil {
125131 return err
126132 }
127-
128- if m .KinesisConsumerMode == SharedThroughput {
129- kclConfig := config .NewKinesisClientLibConfigWithCredential (m .ConsumerName ,
130- m .StreamName , m .Region , m .ConsumerName ,
131- client .Config .Credentials )
132- a .workerConfig = kclConfig
133- }
134-
135- a .streamARN = stream .StreamDescription .StreamARN
136- a .metadata = m
137- a .client = client
138-
133+ a .authProvider = provider
139134 return nil
140135}
141136
@@ -148,7 +143,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
148143 if partitionKey == "" {
149144 partitionKey = uuid .New ().String ()
150145 }
151- _ , err := a .client .PutRecordWithContext (ctx , & kinesis.PutRecordInput {
146+ _ , err := a .authProvider . Kinesis (). Kinesis .PutRecordWithContext (ctx , & kinesis.PutRecordInput {
152147 StreamName : & a .metadata .StreamName ,
153148 Data : req .Data ,
154149 PartitionKey : & partitionKey ,
@@ -161,16 +156,15 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
161156 if a .closed .Load () {
162157 return errors .New ("binding is closed" )
163158 }
164-
165159 if a .metadata .KinesisConsumerMode == SharedThroughput {
166- a .worker = worker .NewWorker (a .recordProcessorFactory (ctx , handler ), a .workerConfig )
160+ a .worker = worker .NewWorker (a .recordProcessorFactory (ctx , handler ), a .authProvider . Kinesis (). WorkerCfg ( ctx , a . streamName , a . consumerName , a . consumerMode ) )
167161 err = a .worker .Start ()
168162 if err != nil {
169163 return err
170164 }
171165 } else if a .metadata .KinesisConsumerMode == ExtendedFanout {
172166 var stream * kinesis.DescribeStreamOutput
173- stream , err = a .client .DescribeStream (& kinesis.DescribeStreamInput {StreamName : & a .metadata .StreamName })
167+ stream , err = a .authProvider . Kinesis (). Kinesis .DescribeStream (& kinesis.DescribeStreamInput {StreamName : & a .metadata .StreamName })
174168 if err != nil {
175169 return err
176170 }
@@ -180,6 +174,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
180174 }
181175 }
182176
177+ stream , err := a .authProvider .Kinesis ().Stream (ctx , a .streamName )
178+ if err != nil {
179+ return fmt .Errorf ("failed to get kinesis stream arn: %v" , err )
180+ }
183181 // Wait for context cancelation then stop
184182 a .wg .Add (1 )
185183 go func () {
@@ -191,7 +189,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
191189 if a .metadata .KinesisConsumerMode == SharedThroughput {
192190 a .worker .Shutdown ()
193191 } else if a .metadata .KinesisConsumerMode == ExtendedFanout {
194- a .deregisterConsumer (a . streamARN , a .consumerARN )
192+ a .deregisterConsumer (ctx , stream , a .consumerARN )
195193 }
196194 }()
197195
@@ -226,8 +224,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes
226224 return
227225 default :
228226 }
229-
230- sub , err := a .client .SubscribeToShardWithContext (ctx , & kinesis.SubscribeToShardInput {
227+ sub , err := a .authProvider .Kinesis ().Kinesis .SubscribeToShardWithContext (ctx , & kinesis.SubscribeToShardInput {
231228 ConsumerARN : consumerARN ,
232229 ShardId : s .ShardId ,
233230 StartingPosition : & kinesis.StartingPosition {Type : aws .String (kinesis .ShardIteratorTypeLatest )},
@@ -269,14 +266,14 @@ func (a *AWSKinesis) Close() error {
269266 close (a .closeCh )
270267 }
271268 a .wg .Wait ()
272- return nil
269+ return a . authProvider . Close ()
273270}
274271
275272func (a * AWSKinesis ) ensureConsumer (ctx context.Context , streamARN * string ) (* string , error ) {
276273 // Only set timeout on consumer call.
277274 conCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
278275 defer cancel ()
279- consumer , err := a .client .DescribeStreamConsumerWithContext (conCtx , & kinesis.DescribeStreamConsumerInput {
276+ consumer , err := a .authProvider . Kinesis (). Kinesis .DescribeStreamConsumerWithContext (conCtx , & kinesis.DescribeStreamConsumerInput {
280277 ConsumerName : & a .metadata .ConsumerName ,
281278 StreamARN : streamARN ,
282279 })
@@ -288,7 +285,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st
288285}
289286
290287func (a * AWSKinesis ) registerConsumer (ctx context.Context , streamARN * string ) (* string , error ) {
291- consumer , err := a .client .RegisterStreamConsumerWithContext (ctx , & kinesis.RegisterStreamConsumerInput {
288+ consumer , err := a .authProvider . Kinesis (). Kinesis .RegisterStreamConsumerWithContext (ctx , & kinesis.RegisterStreamConsumerInput {
292289 ConsumerName : & a .metadata .ConsumerName ,
293290 StreamARN : streamARN ,
294291 })
@@ -307,11 +304,11 @@ func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*
307304 return consumer .Consumer .ConsumerARN , nil
308305}
309306
310- func (a * AWSKinesis ) deregisterConsumer (streamARN * string , consumerARN * string ) error {
307+ func (a * AWSKinesis ) deregisterConsumer (ctx context. Context , streamARN * string , consumerARN * string ) error {
311308 if a .consumerARN != nil {
312309 // Use a background context because the running context may have been canceled already
313310 ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
314- _ , err := a .client .DeregisterStreamConsumerWithContext (ctx , & kinesis.DeregisterStreamConsumerInput {
311+ _ , err := a .authProvider . Kinesis (). Kinesis .DeregisterStreamConsumerWithContext (ctx , & kinesis.DeregisterStreamConsumerInput {
315312 ConsumerARN : consumerARN ,
316313 StreamARN : streamARN ,
317314 ConsumerName : & a .metadata .ConsumerName ,
@@ -342,7 +339,7 @@ func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.Des
342339 tmp := * input
343340 inCpy = & tmp
344341 }
345- req , _ := a .client .DescribeStreamConsumerRequest (inCpy )
342+ req , _ := a .authProvider . Kinesis (). Kinesis .DescribeStreamConsumerRequest (inCpy )
346343 req .SetContext (ctx )
347344 req .ApplyOptions (opts ... )
348345
@@ -354,16 +351,6 @@ func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.Des
354351 return w .WaitWithContext (ctx )
355352}
356353
357- func (a * AWSKinesis ) getClient (metadata * kinesisMetadata ) (* kinesis.Kinesis , error ) {
358- sess , err := awsAuth .GetClient (metadata .AccessKey , metadata .SecretKey , metadata .SessionToken , metadata .Region , metadata .Endpoint )
359- if err != nil {
360- return nil , err
361- }
362- k := kinesis .New (sess )
363-
364- return k , nil
365- }
366-
367354func (a * AWSKinesis ) parseMetadata (meta bindings.Metadata ) (* kinesisMetadata , error ) {
368355 var m kinesisMetadata
369356 err := kitmd .DecodeMetadata (meta .Properties , & m )
0 commit comments