Skip to content

Commit 4a3b779

Browse files
authored
adding kafka sasl support (#865)
1 parent aa6a547 commit 4a3b779

File tree

4 files changed

+50
-1
lines changed

4 files changed

+50
-1
lines changed

cmd/ktranslate/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,12 @@ func applyFlags(cfg *ktranslate.Config) error {
600600
cfg.KafkaSink.BootstrapServers = val
601601
case "kafka.tls.config":
602602
cfg.KafkaSink.TlsConfig = val
603+
case "kafka.sasl.user":
604+
cfg.KafkaSink.SaslUser = val
605+
case "kafka.sasl.password":
606+
cfg.KafkaSink.SaslPass = val
607+
case "kafka.sasl.mechanism":
608+
cfg.KafkaSink.SaslMech = val
603609
// pkg/sinks/kentik
604610
case "kentik_relay_url":
605611
cfg.KentikSink.RelayURL = val

config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ type KafkaSinkConfig struct {
134134
Topic string
135135
BootstrapServers string
136136
TlsConfig string
137+
SaslUser string
138+
SaslPass string
139+
SaslMech string
137140
}
138141

139142
// KentikSinkConfig is the config for the Kentik sink
@@ -472,6 +475,9 @@ func DefaultConfig() *Config {
472475
Topic: "",
473476
BootstrapServers: "",
474477
TlsConfig: "",
478+
SaslUser: "",
479+
SaslPass: "",
480+
SaslMech: "",
475481
},
476482
KentikSink: &KentikSinkConfig{
477483
RelayURL: "",

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ require (
148148
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
149149
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
150150
github.com/xanzy/ssh-agent v0.3.3 // indirect
151+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
152+
github.com/xdg/stringprep v1.0.0 // indirect
151153
go.mongodb.org/mongo-driver v1.12.1 // indirect
152154
go.opencensus.io v0.24.0 // indirect
153155
go.opentelemetry.io/auto/sdk v1.1.0 // indirect

pkg/sinks/kafka/kafa.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,27 @@ import (
1515
"github.com/kentik/ktranslate/pkg/formats"
1616
"github.com/kentik/ktranslate/pkg/kt"
1717
kafka "github.com/segmentio/kafka-go"
18+
"github.com/segmentio/kafka-go/sasl"
19+
"github.com/segmentio/kafka-go/sasl/plain"
20+
"github.com/segmentio/kafka-go/sasl/scram"
1821
)
1922

2023
var (
2124
topic string
2225
bootstrapServers string
2326
tlsConfig string
27+
saslUser string
28+
saslPass string
29+
saslMech string
2430
)
2531

2632
func init() {
2733
flag.StringVar(&topic, "kafka_topic", "", "kafka topic to produce on")
2834
flag.StringVar(&bootstrapServers, "bootstrap.servers", "", "bootstrap.servers")
2935
flag.StringVar(&tlsConfig, "kafka.tls.config", "", "tls config to use for kafka. Can be basic|cert.pem,key.pem|cert.pem,key.pem,ca-cert.pem")
36+
flag.StringVar(&saslUser, "kafka.sasl.user", "", "kafka user")
37+
flag.StringVar(&saslPass, "kafka.sasl.password", "", "kafka password")
38+
flag.StringVar(&saslMech, "kafka.sasl.mechanism", "", "plain|scram")
3039
}
3140

3241
/**
@@ -115,10 +124,36 @@ func (s *KafkaSink) Init(ctx context.Context, format formats.Format, compression
115124
Balancer: &kafka.LeastBytes{},
116125
}
117126

127+
var mech sasl.Mechanism
128+
129+
if s.config.SaslMech != "" {
130+
s.Infof("Adding Sasl Support")
131+
switch s.config.SaslMech {
132+
case "plain":
133+
mech = plain.Mechanism{
134+
Username: s.config.SaslUser,
135+
Password: s.config.SaslPass,
136+
}
137+
case "scram":
138+
mechl, err := scram.Mechanism(scram.SHA512, s.config.SaslUser, s.config.SaslPass)
139+
if err != nil {
140+
return err
141+
}
142+
mech = mechl
143+
default:
144+
return fmt.Errorf("Invalid kafka.sasl.mechanism flag=%s. Valid is plain|scram", s.config.SaslMech)
145+
}
146+
}
147+
118148
if s.tlsConf != nil {
119149
s.Infof("Adding TLS Support")
120150
s.kp.Transport = &kafka.Transport{
121-
TLS: s.tlsConf,
151+
TLS: s.tlsConf,
152+
SASL: mech,
153+
}
154+
} else if mech != nil {
155+
s.kp.Transport = &kafka.Transport{
156+
SASL: mech,
122157
}
123158
}
124159

0 commit comments

Comments
 (0)