Skip to content

Commit 8fb9521

Browse files
authored
Merge pull request #251 from myronjhicks/add-sasl-mechanism-config
Add Support for AWS MSK IAM authentication
2 parents f0e2bb5 + b137264 commit 8fb9521

File tree

4 files changed

+103
-3
lines changed

4 files changed

+103
-3
lines changed

docs/config.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ ACHGateway:
7272
Topic: <string>
7373
TLS: <boolean>
7474
AutoCommit: <boolean>
75+
[ SASLMechanism: <string> | default = "PLAIN" ]
76+
Provider:
77+
AWS:
78+
[ Region: <string> | default = "" ]
79+
[ Profile: <string> | default = "" ]
80+
[ RoleARN: <string> | default = "" ]
81+
[ SessionName: <string> | default = "" ]
7582
Transform:
7683
Encoding:
7784
[ Base64: <boolean> | default = false ]
@@ -156,6 +163,13 @@ ACHGateway:
156163
Topic: <string>
157164
[ TLS: <boolean> | default = false ]
158165
[ AutoCommit: <boolean> | default = false ]
166+
[ SASLMechanism: <string> | default = "PLAIN" ]
167+
Provider:
168+
AWS:
169+
[ Region: <string> | default = "" ]
170+
[ Profile: <string> | default = "" ]
171+
[ RoleARN: <string> | default = "" ]
172+
[ SessionName: <string> | default = "" ]
159173
Webhook:
160174
[ Endpoint: <string> | default = "" ]
161175
```

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ require (
6767
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.24.1 // indirect
6868
github.com/Microsoft/go-winio v0.6.2 // indirect
6969
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
70+
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1
7071
github.com/aws/aws-sdk-go v1.50.36 // indirect
7172
github.com/aws/aws-sdk-go-v2 v1.25.3 // indirect
7273
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect

internal/kafka/kafka.go

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package kafka
22

33
import (
4+
"context"
5+
"crypto/tls"
46
"time"
57

68
"github.com/moov-io/achgateway/internal/service"
79
"github.com/moov-io/base/log"
810

911
"github.com/Shopify/sarama"
12+
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
1013
"gocloud.dev/pubsub"
1114
"gocloud.dev/pubsub/kafkapubsub"
1215
)
@@ -15,13 +18,58 @@ var (
1518
minKafkaVersion = sarama.V2_6_0_0
1619
)
1720

21+
type MSKAccessTokenProvider struct {
22+
Region string
23+
Profile string
24+
RoleARN string
25+
SessionName string
26+
}
27+
28+
func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) {
29+
var token string
30+
var err error
31+
32+
// Choose the correct AWS authentication method
33+
switch {
34+
case m.Profile != "":
35+
token, _, err = signer.GenerateAuthTokenFromProfile(context.TODO(), m.Region, m.Profile)
36+
case m.RoleARN != "":
37+
token, _, err = signer.GenerateAuthTokenFromRole(context.TODO(), m.Region, m.RoleARN, m.SessionName)
38+
default:
39+
token, _, err = signer.GenerateAuthToken(context.TODO(), m.Region)
40+
}
41+
42+
if err != nil {
43+
return nil, err
44+
}
45+
return &sarama.AccessToken{Token: token}, nil
46+
}
47+
1848
func OpenTopic(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Topic, error) {
1949
config := kafkapubsub.MinimalConfig()
2050
config.Version = minKafkaVersion
2151
config.Net.TLS.Enable = cfg.TLS
2252

2353
config.Net.SASL.Enable = cfg.Key != ""
24-
config.Net.SASL.Mechanism = sarama.SASLMechanism("PLAIN")
54+
55+
switch cfg.SASLMechanism {
56+
case "AWS_MSK_IAM":
57+
if cfg.Provider != nil && cfg.Provider.AWS != nil {
58+
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
59+
Region: cfg.Provider.AWS.Region,
60+
Profile: cfg.Provider.AWS.Profile,
61+
RoleARN: cfg.Provider.AWS.RoleARN,
62+
SessionName: cfg.Provider.AWS.SessionName,
63+
}
64+
}
65+
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
66+
config.Net.TLS.Enable = true
67+
config.Net.TLS.Config = &tls.Config{}
68+
69+
default:
70+
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
71+
}
72+
2573
config.Net.SASL.User = cfg.Key
2674
config.Net.SASL.Password = cfg.Secret
2775

@@ -33,6 +81,7 @@ func OpenTopic(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Topic, erro
3381
Set("tls", log.Bool(cfg.TLS)).
3482
Set("group", log.String(cfg.Group)).
3583
Set("sasl.enable", log.Bool(config.Net.SASL.Enable)).
84+
Set("sasl.mechanism", log.String(string(config.Net.SASL.Mechanism))).
3685
Set("sasl.user", log.String(cfg.Key)).
3786
Set("topic", log.String(cfg.Topic)).
3887
Log("opening kafka topic")
@@ -46,7 +95,25 @@ func OpenSubscription(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Subs
4695
config.Net.TLS.Enable = cfg.TLS
4796

4897
config.Net.SASL.Enable = cfg.Key != ""
49-
config.Net.SASL.Mechanism = sarama.SASLMechanism("PLAIN")
98+
// Default to PLAIN if no SASL mechanism is specified
99+
switch cfg.SASLMechanism {
100+
case "AWS_MSK_IAM":
101+
if cfg.Provider != nil && cfg.Provider.AWS != nil {
102+
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{
103+
Region: cfg.Provider.AWS.Region,
104+
Profile: cfg.Provider.AWS.Profile,
105+
RoleARN: cfg.Provider.AWS.RoleARN,
106+
SessionName: cfg.Provider.AWS.SessionName,
107+
}
108+
}
109+
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
110+
config.Net.TLS.Enable = true
111+
config.Net.TLS.Config = &tls.Config{}
112+
113+
default:
114+
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
115+
}
116+
50117
config.Net.SASL.User = cfg.Key
51118
config.Net.SASL.Password = cfg.Secret
52119

@@ -62,6 +129,7 @@ func OpenSubscription(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Subs
62129
Set("tls", log.Bool(cfg.TLS)).
63130
Set("group", log.String(cfg.Group)).
64131
Set("sasl.enable", log.Bool(config.Net.SASL.Enable)).
132+
Set("sasl.mechanism", log.String(string(config.Net.SASL.Mechanism))).
65133
Set("sasl.user", log.String(cfg.Key)).
66134
Set("topic", log.String(cfg.Topic)).
67135
Log("setting up kafka subscription")

internal/service/model_inbound.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,20 @@ func (cfg *InMemory) Validate() error {
6666
return nil
6767
}
6868

69+
// AWSConfig holds authentication settings for AWS MSK clusters.
70+
// Used when KafkaConfig.SASLMechanism is set to AWS_MSK_IAM.
71+
type AWSConfig struct {
72+
Region string
73+
Profile string
74+
RoleARN string
75+
SessionName string
76+
}
77+
78+
// KafkaProvider represents the cloud provider settings for Kafka
79+
type KafkaProvider struct {
80+
AWS *AWSConfig
81+
}
82+
6983
type KafkaConfig struct {
7084
Brokers []string
7185
Key string
@@ -77,7 +91,10 @@ type KafkaConfig struct {
7791
// AutoCommit in Sarama refers to "automated publishing of consumer offsets
7892
// to the broker" rather than a Kafka broker's meaning of "commit consumer
7993
// offsets on read" which leads to "at-most-once" delivery.
80-
AutoCommit bool
94+
AutoCommit bool
95+
SASLMechanism string
96+
97+
Provider *KafkaProvider
8198

8299
Consumer KafkaConsumerConfig
83100
Producer KafkaProducerConfig

0 commit comments

Comments
 (0)