Skip to content

Commit 931089f

Browse files
committed
use sub object within kafka for cloud provider specific settings related to kafka
1 parent 79a5a20 commit 931089f

File tree

3 files changed

+36
-26
lines changed

3 files changed

+36
-26
lines changed

docs/config.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ ACHGateway:
7373
TLS: <boolean>
7474
AutoCommit: <boolean>
7575
[ SASLMechanism: <string> | default = "PLAIN" ]
76-
[ AWSRegion: <string> | default = "" ]
77-
[ AWSProfile: <string> | default = "" ]
78-
[ AWSRoleARN: <string> | default = "" ]
79-
[ AWSSessionName: <string> | default = "" ]
76+
AWS:
77+
[ Region: <string> | default = "" ]
78+
[ Profile: <string> | default = "" ]
79+
[ RoleARN: <string> | default = "" ]
80+
[ SessionName: <string> | default = "" ]
8081
Transform:
8182
Encoding:
8283
[ Base64: <boolean> | default = false ]
@@ -162,10 +163,11 @@ ACHGateway:
162163
[ TLS: <boolean> | default = false ]
163164
[ AutoCommit: <boolean> | default = false ]
164165
[ SASLMechanism: <string> | default = "PLAIN" ]
165-
[ AWSRegion: <string> | default = "" ]
166-
[ AWSProfile: <string> | default = "" ]
167-
[ AWSRoleARN: <string> | default = "" ]
168-
[ AWSSessionName: <string> | default = "" ]
166+
AWS:
167+
[ Region: <string> | default = "" ]
168+
[ Profile: <string> | default = "" ]
169+
[ RoleARN: <string> | default = "" ]
170+
[ SessionName: <string> | default = "" ]
169171
Webhook:
170172
[ Endpoint: <string> | default = "" ]
171173
```

internal/kafka/kafka.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,15 @@ func OpenTopic(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Topic, erro
5454

5555
switch cfg.SASLMechanism {
5656
case "AWS_MSK_IAM":
57-
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
58-
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
59-
Region: cfg.AWSRegion,
60-
Profile: cfg.AWSProfile,
61-
RoleARN: cfg.AWSRoleARN,
62-
SessionName: cfg.AWSSessionName,
57+
if cfg.AWS != nil {
58+
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
59+
Region: cfg.AWS.Region,
60+
Profile: cfg.AWS.Profile,
61+
RoleARN: cfg.AWS.RoleARN,
62+
SessionName: cfg.AWS.SessionName,
63+
}
6364
}
65+
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
6466
config.Net.TLS.Enable = true
6567
config.Net.TLS.Config = &tls.Config{}
6668

@@ -82,7 +84,6 @@ func OpenTopic(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Topic, erro
8284
Set("sasl.mechanism", log.String(string(config.Net.SASL.Mechanism))).
8385
Set("sasl.user", log.String(cfg.Key)).
8486
Set("topic", log.String(cfg.Topic)).
85-
Set("aws.region", log.String(cfg.AWSRegion)).
8687
Log("opening kafka topic")
8788

8889
return kafkapubsub.OpenTopic(cfg.Brokers, config, cfg.Topic, nil)
@@ -97,13 +98,15 @@ func OpenSubscription(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Subs
9798
// Default to PLAIN if no SASL mechanism is specified
9899
switch cfg.SASLMechanism {
99100
case "AWS_MSK_IAM":
100-
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
101-
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
102-
Region: cfg.AWSRegion,
103-
Profile: cfg.AWSProfile,
104-
RoleARN: cfg.AWSRoleARN,
105-
SessionName: cfg.AWSSessionName,
101+
if cfg.AWS != nil {
102+
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
103+
Region: cfg.AWS.Region,
104+
Profile: cfg.AWS.Profile,
105+
RoleARN: cfg.AWS.RoleARN,
106+
SessionName: cfg.AWS.SessionName,
107+
}
106108
}
109+
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
107110
config.Net.TLS.Enable = true
108111
config.Net.TLS.Config = &tls.Config{}
109112

@@ -129,7 +132,6 @@ func OpenSubscription(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Subs
129132
Set("sasl.mechanism", log.String(string(config.Net.SASL.Mechanism))).
130133
Set("sasl.user", log.String(cfg.Key)).
131134
Set("topic", log.String(cfg.Topic)).
132-
Set("aws.region", log.String(cfg.AWSRegion)).
133135
Log("setting up kafka subscription")
134136

135137
return kafkapubsub.OpenSubscription(cfg.Brokers, config, cfg.Group, []string{cfg.Topic}, &kafkapubsub.SubscriptionOptions{

internal/service/model_inbound.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,15 @@ func (cfg *InMemory) Validate() error {
6666
return nil
6767
}
6868

69+
// AWSConfig holds authentication settings for AWS MSK clusters.
70+
// Used when KafkaConfig.SASLMechanism is set to AWS_MSK_IAM.
71+
type AWSConfig struct {
72+
Region string
73+
Profile string
74+
RoleARN string
75+
SessionName string
76+
}
77+
6978
type KafkaConfig struct {
7079
Brokers []string
7180
Key string
@@ -80,10 +89,7 @@ type KafkaConfig struct {
8089
AutoCommit bool
8190
SASLMechanism string
8291

83-
AWSRegion string
84-
AWSProfile string
85-
AWSRoleARN string
86-
AWSSessionName string
92+
AWS *AWSConfig
8793

8894
Consumer KafkaConsumerConfig
8995
Producer KafkaProducerConfig

0 commit comments

Comments
 (0)