From 12065e2519ad4de857e7d94328da021c919d2a54 Mon Sep 17 00:00:00 2001 From: Jason Lai Date: Tue, 11 Feb 2020 19:13:25 -0800 Subject: [PATCH 1/5] Add SCRAM support for Kafka sink --- go.mod | 1 + go.sum | 2 ++ sinks/interfaces.go | 4 +++- sinks/kafkasink.go | 31 ++++++++++++++++++++++++--- sinks/scram_client.go | 50 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 sinks/scram_client.go diff --git a/go.mod b/go.mod index ea798b14a..75c743cdc 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/rockset/rockset-go-client v0.6.0 github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0 github.com/spf13/viper v1.4.0 + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect k8s.io/api v0.0.0-20190814101207-0772a1bdf941 k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6 diff --git a/go.sum b/go.sum index ef75beeba..af690f47f 100644 --- a/go.sum +++ b/go.sum @@ -266,7 +266,9 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/sinks/interfaces.go b/sinks/interfaces.go index c57098f7d..c8e6d39ee 100644 --- a/sinks/interfaces.go +++ b/sinks/interfaces.go @@ -65,6 +65,7 @@ func ManufactureSink() (e EventSinkInterface) { viper.SetDefault("kafkaRetryMax", 5) viper.SetDefault("kafkaSaslUser", "") viper.SetDefault("kafkaSaslPwd", "") + viper.SetDefault("kafkaSaslMechanism", "") brokers := viper.GetStringSlice("kafkaBrokers") topic := viper.GetString("kafkaTopic") @@ -72,8 +73,9 @@ func ManufactureSink() (e EventSinkInterface) { retryMax := viper.GetInt("kafkaRetryMax") saslUser := viper.GetString("kafkaSaslUser") saslPwd := viper.GetString("kafkaSaslPwd") + saslMechanism := viper.GetString("kafkaSaslMechanism") - e, err := NewKafkaSink(brokers, topic, async, retryMax, saslUser, saslPwd) + e, err := NewKafkaSink(brokers, topic, async, retryMax, saslUser, saslPwd, saslMechanism) if err != nil { panic(err.Error()) } diff --git a/sinks/kafkasink.go b/sinks/kafkasink.go index 01cbfe391..025459708 100644 --- a/sinks/kafkasink.go +++ b/sinks/kafkasink.go @@ -17,7 +17,10 @@ limitations under the License. package sinks import ( + "fmt" + "encoding/json" + "github.com/Shopify/sarama" "github.com/golang/glog" "k8s.io/api/core/v1" @@ -29,10 +32,21 @@ type KafkaSink struct { producer interface{} } +var ( + kafkaSASLMechanisms = map[sarama.SASLMechanism]func() sarama.SCRAMClient { + sarama.SASLTypeSCRAMSHA256: func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: scramSHA256} + }, + sarama.SASLTypeSCRAMSHA512: func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: scramSHA512} + }, + } +) + // NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface -func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser string, saslPwd string) (EventSinkInterface, error) { +func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string) (EventSinkInterface, error) { - p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd) + p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd, saslMechanism) if err != nil { return nil, err @@ -44,7 +58,7 @@ func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, sasl }, err } -func sinkFactory(brokers []string, async bool, retryMax int, saslUser string, saslPwd string) (interface{}, error) { +func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string) (interface{}, error) { config := sarama.NewConfig() config.Producer.Retry.Max = retryMax config.Producer.RequiredAcks = sarama.WaitForAll @@ -53,6 +67,17 @@ func sinkFactory(brokers []string, async bool, retryMax int, saslUser string, sa config.Net.SASL.Enable = true config.Net.SASL.User = saslUser config.Net.SASL.Password = saslPwd + + if saslMechanism != "" { + mechanism := sarama.SASLMechanism(saslMechanism) + generatorFunc, ok := kafkaSASLMechanisms[mechanism] + if !ok { + return nil, fmt.Errorf("unknown SASL mechanism name: %q", saslMechanism) + } + + config.Net.SASL.Mechanism = mechanism + config.Net.SASL.SCRAMClientGeneratorFunc = generatorFunc + } } if async { diff --git a/sinks/scram_client.go b/sinks/scram_client.go new file mode 100644 index 000000000..884ec9cd5 --- /dev/null +++ b/sinks/scram_client.go @@ -0,0 +1,50 @@ +/* +Copyright 2017 The Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sinks + +import ( + "crypto/sha512" + + "github.com/xdg/scram" +) + +var ( + scramSHA256 = scram.SHA256 + scramSHA512 = scram.HashGeneratorFcn(sha512.New) +) + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err == nil { + x.ClientConversation = x.Client.NewConversation() + } + return +} + +func (x *XDGSCRAMClient) Step(challenge string) (string, error) { + return x.ClientConversation.Step(challenge) +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} From 950acb0c61e6a78fa39b96f3ff0e724a29d909ce Mon Sep 17 00:00:00 2001 From: Jason Lai Date: Wed, 12 Feb 2020 17:58:14 -0800 Subject: [PATCH 2/5] Add TLS support Kafka sink --- sinks/kafkasink.go | 63 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/sinks/kafkasink.go b/sinks/kafkasink.go index 025459708..953f13f35 100644 --- a/sinks/kafkasink.go +++ b/sinks/kafkasink.go @@ -17,9 +17,13 @@ limitations under the License. package sinks import ( + "errors" "fmt" + "crypto/tls" + "crypto/x509" "encoding/json" + "io/ioutil" "github.com/Shopify/sarama" "github.com/golang/glog" @@ -32,7 +36,15 @@ type KafkaSink struct { producer interface{} } +type kafkaTLSConfig struct { + CertFile string + KeyFile string + CACertFiles []string +} + var ( + errMissingCertOrKeyFile = errors.New("one of certificate file or key file for client authentication missing") + kafkaSASLMechanisms = map[sarama.SASLMechanism]func() sarama.SCRAMClient { sarama.SASLTypeSCRAMSHA256: func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: scramSHA256} @@ -46,7 +58,7 @@ var ( // NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string) (EventSinkInterface, error) { - p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd, saslMechanism) + p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd, saslMechanism, nil) if err != nil { return nil, err @@ -58,7 +70,7 @@ func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, sasl }, err } -func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string) (interface{}, error) { +func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string, ktlsConfig *kafkaTLSConfig) (interface{}, error) { config := sarama.NewConfig() config.Producer.Retry.Max = retryMax config.Producer.RequiredAcks = sarama.WaitForAll @@ -80,6 +92,15 @@ func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, } } + if ktlsConfig != nil { + tlsConfig, err := ktlsConfig.AsTLSConfig() + if err != nil { + return nil, err + } + config.Net.TLS.Enable = true + config.Net.TLS.Config = tlsConfig + } + if async { return sarama.NewAsyncProducer(brokers, config) } @@ -125,3 +146,41 @@ func (ks *KafkaSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) { } } + +func (c *kafkaTLSConfig) AsTLSConfig() (*tls.Config, error) { + var certs []tls.Certificate + var certPool *x509.CertPool + + if c.CertFile != "" && c.KeyFile != "" { + cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, err + } + + certs = []tls.Certificate{cert} + } else if c.CertFile != "" || c.KeyFile != "" { + return nil, errMissingCertOrKeyFile + } + + for _, caCertFile := range c.CACertFiles { + caCert, err := ioutil.ReadFile(caCertFile) + if err != nil { + return nil, err + } + + if certPool == nil { + certPool = x509.NewCertPool() + } + certPool.AppendCertsFromPEM(caCert) + } + + cfg := tls.Config{} + if certs != nil { + cfg.Certificates = certs + } + if certPool != nil { + cfg.RootCAs = certPool + } + + return &cfg, nil +} From 974b8d017b7eba95f08c6be360db4773a9c4ac73 Mon Sep 17 00:00:00 2001 From: Jason Lai Date: Thu, 13 Feb 2020 16:47:01 -0800 Subject: [PATCH 3/5] Add Kafka config structs --- sinks/kafkasink.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sinks/kafkasink.go b/sinks/kafkasink.go index 953f13f35..efd38d423 100644 --- a/sinks/kafkasink.go +++ b/sinks/kafkasink.go @@ -36,6 +36,20 @@ type KafkaSink struct { producer interface{} } +type kafkaConfig struct { + Brokers []string + Async bool + RetryMax int + SASL kafkaSASLConfig + TLS *kafkaTLSConfig +} + +type kafkaSASLConfig struct { + Username string + Password string + Mechanism string +} + type kafkaTLSConfig struct { CertFile string KeyFile string From 256252d9d4f7647dd654b96dc4ea0de7fa0bf74b Mon Sep 17 00:00:00 2001 From: Jason Lai Date: Tue, 18 Feb 2020 10:27:11 -0800 Subject: [PATCH 4/5] Create Kafka producer from kafkaConfig instead --- sinks/kafkasink.go | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/sinks/kafkasink.go b/sinks/kafkasink.go index efd38d423..ea712a8e8 100644 --- a/sinks/kafkasink.go +++ b/sinks/kafkasink.go @@ -72,7 +72,16 @@ var ( // NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string) (EventSinkInterface, error) { - p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd, saslMechanism, nil) + p, err := sinkFactory(&kafkaConfig{ + Brokers: brokers, + Async: async, + RetryMax: retryMax, + SASL: kafkaSASLConfig{ + Username: saslUser, + Password: saslPwd, + Mechanism: saslMechanism, + }, + }) if err != nil { return nil, err @@ -84,21 +93,21 @@ func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, sasl }, err } -func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string, ktlsConfig *kafkaTLSConfig) (interface{}, error) { +func sinkFactory(cfg *kafkaConfig) (interface{}, error) { config := sarama.NewConfig() - config.Producer.Retry.Max = retryMax + config.Producer.Retry.Max = cfg.RetryMax config.Producer.RequiredAcks = sarama.WaitForAll - if saslUser != "" && saslPwd != "" { + if cfg.SASL.Username != "" && cfg.SASL.Password != "" { config.Net.SASL.Enable = true - config.Net.SASL.User = saslUser - config.Net.SASL.Password = saslPwd + config.Net.SASL.User = cfg.SASL.Username + config.Net.SASL.Password = cfg.SASL.Password - if saslMechanism != "" { - mechanism := sarama.SASLMechanism(saslMechanism) + if cfg.SASL.Mechanism != "" { + mechanism := sarama.SASLMechanism(cfg.SASL.Mechanism) generatorFunc, ok := kafkaSASLMechanisms[mechanism] if !ok { - return nil, fmt.Errorf("unknown SASL mechanism name: %q", saslMechanism) + return nil, fmt.Errorf("unknown SASL mechanism name: %q", cfg.SASL.Mechanism) } config.Net.SASL.Mechanism = mechanism @@ -106,8 +115,8 @@ func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, } } - if ktlsConfig != nil { - tlsConfig, err := ktlsConfig.AsTLSConfig() + if cfg.TLS != nil { + tlsConfig, err := cfg.TLS.AsTLSConfig() if err != nil { return nil, err } @@ -115,12 +124,12 @@ func sinkFactory(brokers []string, async bool, retryMax int, saslUser, saslPwd, config.Net.TLS.Config = tlsConfig } - if async { - return sarama.NewAsyncProducer(brokers, config) + if cfg.Async { + return sarama.NewAsyncProducer(cfg.Brokers, config) } config.Producer.Return.Successes = true - return sarama.NewSyncProducer(brokers, config) + return sarama.NewSyncProducer(cfg.Brokers, config) } From b0217620099f7389257add913c4c913b905d9f93 Mon Sep 17 00:00:00 2001 From: Jason Lai Date: Thu, 20 Feb 2020 19:42:00 -0800 Subject: [PATCH 5/5] Add a new config parameter of `kafkaTLS` for TLS settings --- sinks/interfaces.go | 27 ++++++++++++++++++++------- sinks/kafkasink.go | 15 +++------------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/sinks/interfaces.go b/sinks/interfaces.go index c8e6d39ee..1da3e0a78 100644 --- a/sinks/interfaces.go +++ b/sinks/interfaces.go @@ -67,15 +67,28 @@ func ManufactureSink() (e EventSinkInterface) { viper.SetDefault("kafkaSaslPwd", "") viper.SetDefault("kafkaSaslMechanism", "") - brokers := viper.GetStringSlice("kafkaBrokers") + var tlsCfg *kafkaTLSConfig + if viper.IsSet("kafkaTLS") { + tlsCfg = &kafkaTLSConfig{} + if err := viper.UnmarshalKey("kafkaTLS", tlsCfg); err != nil { + panic(err.Error()) + } + } + + cfg := kafkaConfig{ + Brokers: viper.GetStringSlice("kafkaBrokers"), + Async: viper.GetBool("kafkaAsync"), + RetryMax: viper.GetInt("kafkaRetryMax"), + SASL: kafkaSASLConfig{ + Username: viper.GetString("kafkaSaslUser"), + Password: viper.GetString("kafkaSaslPwd"), + Mechanism: viper.GetString("kafkaSaslMechanism"), + }, + TLS: tlsCfg, + } topic := viper.GetString("kafkaTopic") - async := viper.GetBool("kakfkaAsync") - retryMax := viper.GetInt("kafkaRetryMax") - saslUser := viper.GetString("kafkaSaslUser") - saslPwd := viper.GetString("kafkaSaslPwd") - saslMechanism := viper.GetString("kafkaSaslMechanism") - e, err := NewKafkaSink(brokers, topic, async, retryMax, saslUser, saslPwd, saslMechanism) + e, err := NewKafkaSink(cfg, topic) if err != nil { panic(err.Error()) } diff --git a/sinks/kafkasink.go b/sinks/kafkasink.go index ea712a8e8..e7a4c5e5d 100644 --- a/sinks/kafkasink.go +++ b/sinks/kafkasink.go @@ -70,18 +70,9 @@ var ( ) // NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface -func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser, saslPwd, saslMechanism string) (EventSinkInterface, error) { - - p, err := sinkFactory(&kafkaConfig{ - Brokers: brokers, - Async: async, - RetryMax: retryMax, - SASL: kafkaSASLConfig{ - Username: saslUser, - Password: saslPwd, - Mechanism: saslMechanism, - }, - }) +func NewKafkaSink(cfg kafkaConfig, topic string) (EventSinkInterface, error) { + + p, err := sinkFactory(&cfg) if err != nil { return nil, err