Skip to content

Commit f36a90a

Browse files
committed
Document AWS_MSK_IAM
1 parent 1aac24c commit f36a90a

File tree

6 files changed

+38
-16
lines changed

6 files changed

+38
-16
lines changed

README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,11 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
187187
--proxy-listener-write-buffer-size int Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
188188
--proxy-request-buffer-size int Request buffer size pro tcp connection (default 4096)
189189
--proxy-response-buffer-size int Response buffer size pro tcp connection (default 4096)
190+
--sasl-aws-profile string AWS profile
191+
--sasl-aws-region string Region for AWS IAM Auth
190192
--sasl-enable Connect using SASL
191193
--sasl-jaas-config-file string Location of JAAS config file with SASL username and password
192-
--sasl-method string SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (default "PLAIN")
194+
--sasl-method string SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS_MSK_IAM (default "PLAIN")
193195
--sasl-password string SASL user password
194196
--sasl-plugin-command string Path to authentication plugin binary
195197
--sasl-plugin-enable Use plugin for SASL authentication
@@ -279,6 +281,17 @@ GSSAPI / Kerberos authentication
279281
--gssapi-krb5 /etc/krb5.conf \
280282
--gssapi-keytab /etc/security/keytabs/kafka.keytab
281283
284+
AWS MSK IAM
285+
286+
kafka-proxy server --bootstrap-server-mapping "b-1-public.kafkaproxycluster.uls9ao.c4.kafka.eu-central-1.amazonaws.com:9198,0.0.0.0:30001" \
287+
--bootstrap-server-mapping "b-2-public.kafkaproxycluster.uls9ao.c4.kafka.eu-central-1.amazonaws.com:9198,0.0.0.0:30002" \
288+
--bootstrap-server-mapping "b-3-public.kafkaproxycluster.uls9ao.c4.kafka.eu-central-1.amazonaws.com:9198,0.0.0.0:30003" \
289+
--tls-enable --tls-insecure-skip-verify \
290+
--sasl-enable \
291+
--sasl-method "AWS_MSK_IAM" \
292+
--sasl-aws-region "eu-central-1" \
293+
--log-level debug
294+
282295
283296
### Proxy authentication example
284297

cmd/kafka-proxy/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func initFlags() {
164164
Server.Flags().StringVar(&c.Kafka.SASL.Username, "sasl-username", "", "SASL user name")
165165
Server.Flags().StringVar(&c.Kafka.SASL.Password, "sasl-password", "", "SASL user password")
166166
Server.Flags().StringVar(&c.Kafka.SASL.JaasConfigFile, "sasl-jaas-config-file", "", "Location of JAAS config file with SASL username and password")
167-
Server.Flags().StringVar(&c.Kafka.SASL.Method, "sasl-method", "PLAIN", "SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI")
167+
Server.Flags().StringVar(&c.Kafka.SASL.Method, "sasl-method", "PLAIN", "SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS_MSK_IAM")
168168

169169
// SASL GSSAPI
170170
Server.Flags().StringVar(&c.Kafka.SASL.GSSAPI.AuthType, "gssapi-auth-type", config.KRB5_KEYTAB_AUTH, "GSSAPI auth type: KEYTAB or USER")
@@ -177,6 +177,10 @@ func initFlags() {
177177
Server.Flags().BoolVar(&c.Kafka.SASL.GSSAPI.DisablePAFXFAST, "gssapi-disable-pa-fx-fast", false, "Used to configure the client to not use PA_FX_FAST.")
178178
Server.Flags().StringToStringVar(&c.Kafka.SASL.GSSAPI.SPNHostsMapping, "gssapi-spn-host-mapping", map[string]string{}, "Mapping of Kafka servers address to SPN hosts")
179179

180+
// SASL AWS_MSK_IAM
181+
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.Region, "sasl-aws-region", "", "Region for AWS IAM Auth")
182+
Server.Flags().StringVar(&c.Kafka.SASL.AWSConfig.Profile, "sasl-aws-profile", "", "AWS profile")
183+
180184
// SASL by Proxy plugin
181185
Server.Flags().BoolVar(&c.Kafka.SASL.Plugin.Enable, "sasl-plugin-enable", false, "Use plugin for SASL authentication")
182186
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.Command, "sasl-plugin-command", "", "Path to authentication plugin binary")
@@ -185,8 +189,6 @@ func initFlags() {
185189
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin")
186190
Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout")
187191

188-
Server.Flags().StringVar(&c.Kafka.SASL.AWSRegion, "sasl-iam-region", "", "Region for AWS IAM Auth")
189-
190192
// Web
191193
Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints")
192194
Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on")

config/config.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ type GSSAPIConfig struct {
4646
SPNHostsMapping map[string]string
4747
}
4848

49+
type AWSConfig struct {
50+
Region string
51+
Profile string
52+
}
53+
4954
type Config struct {
5055
Http struct {
5156
ListenAddress string
@@ -161,7 +166,7 @@ type Config struct {
161166
Timeout time.Duration
162167
}
163168
GSSAPI GSSAPIConfig
164-
AWSRegion string
169+
AWSConfig AWSConfig
165170
}
166171
Producer struct {
167172
Acks0Disabled bool

proxy/client.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,13 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
124124
gssapiConfig: &c.Kafka.SASL.GSSAPI,
125125
}
126126
} else if c.Kafka.SASL.Method == SASLIAMAAUTH {
127-
if awsMSKIAMAUTH, err := NewAwsMSKIamAuth(
127+
if saslAuthByProxy, err = NewAwsMSKIamAuth(
128128
c.Kafka.ClientID,
129-
c.Kafka.SASL.AWSRegion,
130129
c.Kafka.ReadTimeout,
131130
c.Kafka.WriteTimeout,
131+
&c.Kafka.SASL.AWSConfig,
132132
); err != nil {
133-
return nil, fmt.Errorf("Failed to create IAM Auth: %v", err)
134-
} else {
135-
saslAuthByProxy = awsMSKIAMAUTH
133+
return nil, errors.Errorf("Failed to create IAM Auth: %v", err)
136134
}
137135
} else {
138136
return nil, errors.Errorf("SASL Mechanism not valid '%s'", c.Kafka.SASL.Method)

proxy/sasl_aws_msk_iam.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/aws/aws-sdk-go-v2/config"
13+
proxyconfig "github.com/grepplabs/kafka-proxy/config"
1314
"github.com/grepplabs/kafka-proxy/proxy/protocol"
1415
"github.com/pkg/errors"
1516
"github.com/sirupsen/logrus"
@@ -28,15 +29,18 @@ type AwsMSKIamAuth struct {
2829

2930
func NewAwsMSKIamAuth(
3031
clientId string,
31-
region string,
3232
readTimeout,
3333
writeTimeout time.Duration,
34+
awsConfig *proxyconfig.AWSConfig,
3435
) (SASLAuthByProxy, error) {
3536
var optFns []func(*config.LoadOptions) error
36-
if region != "" {
37-
optFns = append(optFns, config.WithRegion(region))
37+
if awsConfig.Region != "" {
38+
optFns = append(optFns, config.WithRegion(awsConfig.Region))
3839
}
39-
cfg, err := config.LoadDefaultConfig(context.TODO(), optFns...)
40+
if awsConfig.Profile != "" {
41+
optFns = append(optFns, config.WithSharedConfigProfile(awsConfig.Profile))
42+
}
43+
cfg, err := config.LoadDefaultConfig(context.Background(), optFns...)
4044
if err != nil {
4145
return nil, fmt.Errorf("loading aws config: %v", err)
4246
}
@@ -98,7 +102,7 @@ func (a *AwsMSKIamAuth) saslAuthenticate(conn DeadlineReaderWriter, brokerString
98102
return fmt.Errorf("failed to parse host/port: %v", err)
99103
}
100104

101-
authBytes, err := a.signer.SASLToken(context.TODO(), host)
105+
authBytes, err := a.signer.SASLToken(context.Background(), host)
102106
if err != nil {
103107
return fmt.Errorf("failed to generate SASL token %v", err)
104108
}

proxy/sasl_scram.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (b *SASLSCRAMAuth) sendAndReceiveSASLHandshake(conn DeadlineReaderWriter) e
174174

175175
func (b *SASLSCRAMAuth) sendSaslAuthenticateRequest(conn DeadlineReaderWriter, correlationID int32, msg []byte) (int, error) {
176176
// rb := &SaslAuthenticateRequest{msg}
177-
rb := &protocol.SaslAuthenticateRequestV0{msg}
177+
rb := &protocol.SaslAuthenticateRequestV0{SaslAuthBytes: msg}
178178
//req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
179179
req := &protocol.Request{CorrelationID: correlationID, ClientID: b.clientID, Body: rb}
180180
//buf, err := encode(req, b.conf.MetricRegistry)

0 commit comments

Comments
 (0)