@@ -16,13 +16,8 @@ package aws
1616import (
1717 "context"
1818 "errors"
19- "fmt"
2019 "sync"
21- "time"
2220
23- "github.com/IBM/sarama"
24- "github.com/aws/aws-msk-iam-sasl-signer-go/signer"
25- aws2 "github.com/aws/aws-sdk-go-v2/aws"
2621 "github.com/aws/aws-sdk-go/aws"
2722 "github.com/aws/aws-sdk-go/aws/credentials"
2823 "github.com/aws/aws-sdk-go/aws/session"
@@ -56,7 +51,6 @@ type Clients struct {
5651 ParameterStore * ParameterStoreClients
5752 kinesis * KinesisClients
5853 ses * SesClients
59- kafka * KafkaClients
6054}
6155
6256func newClients () * Clients {
@@ -85,14 +79,6 @@ func (c *Clients) refresh(session *session.Session) error {
8579 c .kinesis .New (session )
8680 case c .ses != nil :
8781 c .ses .New (session )
88- case c .kafka != nil :
89- // Note: we pass in nil for token provider
90- // as there are no special fields for x509 auth for it.
91- // Only static auth passes it in.
92- err := c .kafka .New (session , nil )
93- if err != nil {
94- return fmt .Errorf ("failed to refresh Kafka AWS IAM Config: %w" , err )
95- }
9682 }
9783 return nil
9884}
@@ -139,16 +125,6 @@ type SesClients struct {
139125 Ses * ses.SES
140126}
141127
142- type KafkaClients struct {
143- config * sarama.Config
144- consumerGroup * string
145- brokers * []string
146- maxMessageBytes * int
147-
148- ConsumerGroup sarama.ConsumerGroup
149- Producer sarama.SyncProducer
150- }
151-
152128func (c * S3Clients ) New (session * session.Session ) {
153129 refreshedS3 := s3 .New (session , session .Config )
154130 c .S3 = refreshedS3
@@ -232,120 +208,3 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s
232208func (c * SesClients ) New (session * session.Session ) {
233209 c .Ses = ses .New (session , session .Config )
234210}
235-
236- type KafkaOptions struct {
237- Config * sarama.Config
238- ConsumerGroup string
239- Brokers []string
240- MaxMessageBytes int
241- }
242-
243- func initKafkaClients (opts KafkaOptions ) * KafkaClients {
244- return & KafkaClients {
245- config : opts .Config ,
246- consumerGroup : & opts .ConsumerGroup ,
247- brokers : & opts .Brokers ,
248- maxMessageBytes : & opts .MaxMessageBytes ,
249- }
250- }
251-
252- func (c * KafkaClients ) New (session * session.Session , tokenProvider * mskTokenProvider ) error {
253- const timeout = 10 * time .Second
254- creds , err := session .Config .Credentials .Get ()
255- if err != nil {
256- return fmt .Errorf ("failed to get credentials from session: %w" , err )
257- }
258-
259- // fill in token provider common fields across x509 and static auth
260- if tokenProvider == nil {
261- tokenProvider = & mskTokenProvider {}
262- }
263- tokenProvider .generateTokenTimeout = timeout
264- tokenProvider .region = * session .Config .Region
265- tokenProvider .accessKey = creds .AccessKeyID
266- tokenProvider .secretKey = creds .SecretAccessKey
267- tokenProvider .sessionToken = creds .SessionToken
268-
269- c .config .Net .SASL .Enable = true
270- c .config .Net .SASL .Mechanism = sarama .SASLTypeOAuth
271- c .config .Net .SASL .TokenProvider = tokenProvider
272-
273- _ , err = c .config .Net .SASL .TokenProvider .Token ()
274- if err != nil {
275- return fmt .Errorf ("error validating iam credentials %v" , err )
276- }
277-
278- consumerGroup , err := sarama .NewConsumerGroup (* c .brokers , * c .consumerGroup , c .config )
279- if err != nil {
280- return err
281- }
282- c .ConsumerGroup = consumerGroup
283-
284- producer , err := c .getSyncProducer ()
285- if err != nil {
286- return err
287- }
288- c .Producer = producer
289-
290- return nil
291- }
292-
293- // Kafka specific
294- type mskTokenProvider struct {
295- generateTokenTimeout time.Duration
296- accessKey string
297- secretKey string
298- sessionToken string
299- awsIamRoleArn string
300- awsStsSessionName string
301- region string
302- }
303-
304- func (m * mskTokenProvider ) Token () (* sarama.AccessToken , error ) {
305- // this function can't use the context passed on Init because that context would be cancelled right after Init
306- ctx , cancel := context .WithTimeout (context .Background (), m .generateTokenTimeout )
307- defer cancel ()
308-
309- switch {
310- // we must first check if we are using the assume role auth profile
311- case m .awsIamRoleArn != "" && m .awsStsSessionName != "" :
312- token , _ , err := signer .GenerateAuthTokenFromRole (ctx , m .region , m .awsIamRoleArn , m .awsStsSessionName )
313- return & sarama.AccessToken {Token : token }, err
314- case m .accessKey != "" && m .secretKey != "" :
315- token , _ , err := signer .GenerateAuthTokenFromCredentialsProvider (ctx , m .region , aws2 .CredentialsProviderFunc (func (ctx context.Context ) (aws2.Credentials , error ) {
316- return aws2.Credentials {
317- AccessKeyID : m .accessKey ,
318- SecretAccessKey : m .secretKey ,
319- SessionToken : m .sessionToken ,
320- }, nil
321- }))
322- return & sarama.AccessToken {Token : token }, err
323-
324- default : // load default aws creds
325- token , _ , err := signer .GenerateAuthToken (ctx , m .region )
326- return & sarama.AccessToken {Token : token }, err
327- }
328- }
329-
330- func (c * KafkaClients ) getSyncProducer () (sarama.SyncProducer , error ) {
331- // Add SyncProducer specific properties to copy of base config
332- c .config .Producer .RequiredAcks = sarama .WaitForAll
333- c .config .Producer .Retry .Max = 5
334- c .config .Producer .Return .Successes = true
335-
336- if * c .maxMessageBytes > 0 {
337- c .config .Producer .MaxMessageBytes = * c .maxMessageBytes
338- }
339-
340- saramaClient , err := sarama .NewClient (* c .brokers , c .config )
341- if err != nil {
342- return nil , err
343- }
344-
345- producer , err := sarama .NewSyncProducerFromClient (saramaClient )
346- if err != nil {
347- return nil , err
348- }
349-
350- return producer , nil
351- }
0 commit comments