1
1
package kafka
2
2
3
3
import (
4
+ "context"
5
+ "crypto/tls"
4
6
"time"
5
7
6
8
"github.com/moov-io/achgateway/internal/service"
7
9
"github.com/moov-io/base/log"
8
10
9
11
"github.com/Shopify/sarama"
12
+ "github.com/aws/aws-msk-iam-sasl-signer-go/signer"
10
13
"gocloud.dev/pubsub"
11
14
"gocloud.dev/pubsub/kafkapubsub"
12
15
)
@@ -15,30 +18,66 @@ var (
15
18
minKafkaVersion = sarama .V2_6_0_0
16
19
)
17
20
21
+ type MSKAccessTokenProvider struct {
22
+ Region string
23
+ Profile string
24
+ RoleARN string
25
+ SessionName string
26
+ }
27
+
28
+ func (m * MSKAccessTokenProvider ) Token () (* sarama.AccessToken , error ) {
29
+ var token string
30
+ var err error
31
+
32
+ // Choose the correct AWS authentication method
33
+ switch {
34
+ case m .Profile != "" :
35
+ token , _ , err = signer .GenerateAuthTokenFromProfile (context .TODO (), m .Region , m .Profile )
36
+ case m .RoleARN != "" :
37
+ token , _ , err = signer .GenerateAuthTokenFromRole (context .TODO (), m .Region , m .RoleARN , m .SessionName )
38
+ default :
39
+ token , _ , err = signer .GenerateAuthToken (context .TODO (), m .Region )
40
+ }
41
+
42
+ if err != nil {
43
+ return nil , err
44
+ }
45
+ return & sarama.AccessToken {Token : token }, nil
46
+ }
47
+
18
48
func OpenTopic (logger log.Logger , cfg * service.KafkaConfig ) (* pubsub.Topic , error ) {
19
49
config := kafkapubsub .MinimalConfig ()
20
50
config .Version = minKafkaVersion
21
51
config .Net .TLS .Enable = cfg .TLS
22
52
23
53
config .Net .SASL .Enable = cfg .Key != ""
24
- config .Net .SASL .Mechanism = sarama .SASLMechanism (cfg .SASLMechanism )
25
-
26
54
// Default to PLAIN if no SASL mechanism is specified
27
55
switch cfg .SASLMechanism {
28
56
case "SCRAM-SHA-512" :
29
57
config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient {
30
58
return & XDGSCRAMClient {HashGeneratorFcn : SHA512 }
31
59
}
32
- config .Net .SASL .Mechanism = sarama .SASLMechanism ( cfg . SASLMechanism )
60
+ config .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA512
33
61
34
62
case "SCRAM-SHA-256" :
35
63
config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient {
36
64
return & XDGSCRAMClient {HashGeneratorFcn : SHA256 }
37
65
}
38
- config .Net .SASL .Mechanism = sarama .SASLMechanism (cfg .SASLMechanism )
66
+ config .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA256
67
+
68
+ case "AWS_MSK_IAM" :
69
+ config .Net .SASL .Mechanism = sarama .SASLTypeOAuth
70
+ config .Net .SASL .TokenProvider = & MSKAccessTokenProvider {
71
+ Region : cfg .AWSRegion ,
72
+ Profile : cfg .AWSProfile ,
73
+ RoleARN : cfg .AWSRoleARN ,
74
+ SessionName : cfg .AWSSessionName ,
75
+ }
76
+ config .Net .TLS .Enable = true
77
+ config .Net .TLS .Config = & tls.Config {}
39
78
40
79
default :
41
- config .Net .SASL .Mechanism = sarama .SASLMechanism ( "PLAIN" )
80
+ config .Net .SASL .Mechanism = sarama .SASLTypePlaintext
42
81
}
43
82
44
83
config .Net .SASL .User = cfg .Key
@@ -52,8 +91,10 @@ func OpenTopic(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Topic, erro
52
91
Set ("tls" , log .Bool (cfg .TLS )).
53
92
Set ("group" , log .String (cfg .Group )).
54
93
Set ("sasl.enable" , log .Bool (config .Net .SASL .Enable )).
94
+ Set ("sasl.mechanism" , log .String (string (config .Net .SASL .Mechanism ))).
55
95
Set ("sasl.user" , log .String (cfg .Key )).
56
96
Set ("topic" , log .String (cfg .Topic )).
97
+ Set ("aws.region" , log .String (cfg .AWSRegion )).
57
98
Log ("opening kafka topic" )
58
99
59
100
return kafkapubsub .OpenTopic (cfg .Brokers , config , cfg .Topic , nil )
@@ -71,16 +112,27 @@ func OpenSubscription(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Subs
71
112
config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient {
72
113
return & XDGSCRAMClient {HashGeneratorFcn : SHA512 }
73
114
}
74
- config .Net .SASL .Mechanism = sarama .SASLMechanism ( cfg . SASLMechanism )
115
+ config .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA512
75
116
76
117
case "SCRAM-SHA-256" :
77
118
config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient {
78
119
return & XDGSCRAMClient {HashGeneratorFcn : SHA256 }
79
120
}
80
- config .Net .SASL .Mechanism = sarama .SASLMechanism (cfg .SASLMechanism )
121
+ config .Net .SASL .Mechanism = sarama .SASLTypeSCRAMSHA256
122
+
123
+ case "AWS_MSK_IAM" :
124
+ config .Net .SASL .Mechanism = sarama .SASLTypeOAuth
125
+ config .Net .SASL .TokenProvider = & MSKAccessTokenProvider {
126
+ Region : cfg .AWSRegion ,
127
+ Profile : cfg .AWSProfile ,
128
+ RoleARN : cfg .AWSRoleARN ,
129
+ SessionName : cfg .AWSSessionName ,
130
+ }
131
+ config .Net .TLS .Enable = true
132
+ config .Net .TLS .Config = & tls.Config {}
81
133
82
134
default :
83
- config .Net .SASL .Mechanism = sarama .SASLMechanism ( "PLAIN" )
135
+ config .Net .SASL .Mechanism = sarama .SASLTypePlaintext
84
136
}
85
137
86
138
config .Net .SASL .User = cfg .Key
@@ -98,8 +150,10 @@ func OpenSubscription(logger log.Logger, cfg *service.KafkaConfig) (*pubsub.Subs
98
150
Set ("tls" , log .Bool (cfg .TLS )).
99
151
Set ("group" , log .String (cfg .Group )).
100
152
Set ("sasl.enable" , log .Bool (config .Net .SASL .Enable )).
153
+ Set ("sasl.mechanism" , log .String (string (config .Net .SASL .Mechanism ))).
101
154
Set ("sasl.user" , log .String (cfg .Key )).
102
155
Set ("topic" , log .String (cfg .Topic )).
156
+ Set ("aws.region" , log .String (cfg .AWSRegion )).
103
157
Log ("setting up kafka subscription" )
104
158
105
159
return kafkapubsub .OpenSubscription (cfg .Brokers , config , cfg .Group , []string {cfg .Topic }, & kafkapubsub.SubscriptionOptions {
0 commit comments