diff --git a/go.mod b/go.mod index ea798b14a..75c743cdc 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/rockset/rockset-go-client v0.6.0 github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0 github.com/spf13/viper v1.4.0 + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect k8s.io/api v0.0.0-20190814101207-0772a1bdf941 k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6 diff --git a/go.sum b/go.sum index ef75beeba..af690f47f 100644 --- a/go.sum +++ b/go.sum @@ -266,7 +266,9 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= diff --git a/sinks/interfaces.go b/sinks/interfaces.go index c57098f7d..1da3e0a78 100644 --- a/sinks/interfaces.go +++ b/sinks/interfaces.go @@ -65,15 +65,30 @@ func ManufactureSink() (e EventSinkInterface) { viper.SetDefault("kafkaRetryMax", 5) viper.SetDefault("kafkaSaslUser", "") viper.SetDefault("kafkaSaslPwd", "") - - brokers := viper.GetStringSlice("kafkaBrokers") + viper.SetDefault("kafkaSaslMechanism", "") + + var tlsCfg *kafkaTLSConfig + if viper.IsSet("kafkaTLS") { + tlsCfg = &kafkaTLSConfig{} + if err := viper.UnmarshalKey("kafkaTLS", tlsCfg); err != nil { + panic(err.Error()) + } + } + + cfg := kafkaConfig{ + Brokers: viper.GetStringSlice("kafkaBrokers"), + Async: viper.GetBool("kafkaAsync"), + RetryMax: viper.GetInt("kafkaRetryMax"), + SASL: kafkaSASLConfig{ + Username: viper.GetString("kafkaSaslUser"), + Password: viper.GetString("kafkaSaslPwd"), + Mechanism: viper.GetString("kafkaSaslMechanism"), + }, + TLS: tlsCfg, + } topic := viper.GetString("kafkaTopic") - async := viper.GetBool("kakfkaAsync") - retryMax := viper.GetInt("kafkaRetryMax") - saslUser := viper.GetString("kafkaSaslUser") - saslPwd := viper.GetString("kafkaSaslPwd") - e, err := NewKafkaSink(brokers, topic, async, retryMax, saslUser, saslPwd) + e, err := NewKafkaSink(cfg, topic) if err != nil { panic(err.Error()) } diff --git a/sinks/kafkasink.go b/sinks/kafkasink.go index 01cbfe391..e7a4c5e5d 100644 --- a/sinks/kafkasink.go +++ b/sinks/kafkasink.go @@ -17,7 +17,14 @@ limitations under the License. package sinks import ( + "errors" + "fmt" + + "crypto/tls" + "crypto/x509" "encoding/json" + "io/ioutil" + "github.com/Shopify/sarama" "github.com/golang/glog" "k8s.io/api/core/v1" @@ -29,10 +36,43 @@ type KafkaSink struct { producer interface{} } +type kafkaConfig struct { + Brokers []string + Async bool + RetryMax int + SASL kafkaSASLConfig + TLS *kafkaTLSConfig +} + +type kafkaSASLConfig struct { + Username string + Password string + Mechanism string +} + +type kafkaTLSConfig struct { + CertFile string + KeyFile string + CACertFiles []string +} + +var ( + errMissingCertOrKeyFile = errors.New("one of certificate file or key file for client authentication missing") + + kafkaSASLMechanisms = map[sarama.SASLMechanism]func() sarama.SCRAMClient { + sarama.SASLTypeSCRAMSHA256: func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: scramSHA256} + }, + sarama.SASLTypeSCRAMSHA512: func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: scramSHA512} + }, + } +) + // NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface -func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser string, saslPwd string) (EventSinkInterface, error) { +func NewKafkaSink(cfg kafkaConfig, topic string) (EventSinkInterface, error) { - p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd) + p, err := sinkFactory(&cfg) if err != nil { return nil, err @@ -44,23 +84,43 @@ func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, sasl }, err } -func sinkFactory(brokers []string, async bool, retryMax int, saslUser string, saslPwd string) (interface{}, error) { +func sinkFactory(cfg *kafkaConfig) (interface{}, error) { config := sarama.NewConfig() - config.Producer.Retry.Max = retryMax + config.Producer.Retry.Max = cfg.RetryMax config.Producer.RequiredAcks = sarama.WaitForAll - if saslUser != "" && saslPwd != "" { + if cfg.SASL.Username != "" && cfg.SASL.Password != "" { config.Net.SASL.Enable = true - config.Net.SASL.User = saslUser - config.Net.SASL.Password = saslPwd + config.Net.SASL.User = cfg.SASL.Username + config.Net.SASL.Password = cfg.SASL.Password + + if cfg.SASL.Mechanism != "" { + mechanism := sarama.SASLMechanism(cfg.SASL.Mechanism) + generatorFunc, ok := kafkaSASLMechanisms[mechanism] + if !ok { + return nil, fmt.Errorf("unknown SASL mechanism name: %q", cfg.SASL.Mechanism) + } + + config.Net.SASL.Mechanism = mechanism + config.Net.SASL.SCRAMClientGeneratorFunc = generatorFunc + } + } + + if cfg.TLS != nil { + tlsConfig, err := cfg.TLS.AsTLSConfig() + if err != nil { + return nil, err + } + config.Net.TLS.Enable = true + config.Net.TLS.Config = tlsConfig } - if async { - return sarama.NewAsyncProducer(brokers, config) + if cfg.Async { + return sarama.NewAsyncProducer(cfg.Brokers, config) } config.Producer.Return.Successes = true - return sarama.NewSyncProducer(brokers, config) + return sarama.NewSyncProducer(cfg.Brokers, config) } @@ -100,3 +160,41 @@ func (ks *KafkaSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) { } } + +func (c *kafkaTLSConfig) AsTLSConfig() (*tls.Config, error) { + var certs []tls.Certificate + var certPool *x509.CertPool + + if c.CertFile != "" && c.KeyFile != "" { + cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile) + if err != nil { + return nil, err + } + + certs = []tls.Certificate{cert} + } else if c.CertFile != "" || c.KeyFile != "" { + return nil, errMissingCertOrKeyFile + } + + for _, caCertFile := range c.CACertFiles { + caCert, err := ioutil.ReadFile(caCertFile) + if err != nil { + return nil, err + } + + if certPool == nil { + certPool = x509.NewCertPool() + } + certPool.AppendCertsFromPEM(caCert) + } + + cfg := tls.Config{} + if certs != nil { + cfg.Certificates = certs + } + if certPool != nil { + cfg.RootCAs = certPool + } + + return &cfg, nil +} diff --git a/sinks/scram_client.go b/sinks/scram_client.go new file mode 100644 index 000000000..884ec9cd5 --- /dev/null +++ b/sinks/scram_client.go @@ -0,0 +1,50 @@ +/* +Copyright 2017 The Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sinks + +import ( + "crypto/sha512" + + "github.com/xdg/scram" +) + +var ( + scramSHA256 = scram.SHA256 + scramSHA512 = scram.HashGeneratorFcn(sha512.New) +) + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err == nil { + x.ClientConversation = x.Client.NewConversation() + } + return +} + +func (x *XDGSCRAMClient) Step(challenge string) (string, error) { + return x.ClientConversation.Step(challenge) +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +}