1515package kafka
1616
1717import (
18+ "crypto/tls"
19+ "crypto/x509"
1820 "encoding/json"
1921 "fmt"
22+ "io/ioutil"
23+ "log"
2024 "net/url"
25+ "strconv"
2126 "time"
2227
28+ kafka "github.com/Shopify/sarama"
2329 "github.com/golang/glog"
24- "github.com/optiopay/kafka"
25- "github.com/optiopay/kafka/proto"
2630)
2731
2832const (
29- brokerClientID = "kafka-sink"
30- brokerDialTimeout = 10 * time .Second
31- brokerDialRetryLimit = 1
32- brokerDialRetryWait = 0
33- brokerAllowTopicCreation = true
34- brokerLeaderRetryLimit = 1
35- brokerLeaderRetryWait = 0
36- metricsTopic = "heapster-metrics"
37- eventsTopic = "heapster-events"
33+ brokerClientID = "kafka-sink"
34+ brokerDialTimeout = 10 * time .Second
35+ brokerDialRetryLimit = 1
36+ brokerDialRetryWait = 0
37+ brokerLeaderRetryLimit = 1
38+ brokerLeaderRetryWait = 0
39+ metricsTopic = "heapster-metrics"
40+ eventsTopic = "heapster-events"
3841)
3942
4043const (
4144 TimeSeriesTopic = "timeseriestopic"
4245 EventsTopic = "eventstopic"
43- compression = "compression"
4446)
4547
4648type KafkaClient interface {
@@ -50,7 +52,7 @@ type KafkaClient interface {
5052}
5153
5254type kafkaSink struct {
53- producer kafka.DistributingProducer
55+ producer kafka.AsyncProducer
5456 dataTopic string
5557}
5658
@@ -61,10 +63,10 @@ func (sink *kafkaSink) ProduceKafkaMessage(msgData interface{}) error {
6163 return fmt .Errorf ("failed to transform the items to json : %s" , err )
6264 }
6365
64- message := & proto. Message { Value : [] byte ( string ( msgJson ))}
65- _ , err = sink .producer . Distribute ( sink . dataTopic , message )
66- if err != nil {
67- return fmt . Errorf ( "failed to produce message to %s: %s" , sink . dataTopic , err )
66+ sink . producer . Input () <- & kafka. ProducerMessage {
67+ Topic : sink .dataTopic ,
68+ Key : nil ,
69+ Value : kafka . ByteEncoder ([] byte ( string ( msgJson ))),
6870 }
6971 end := time .Now ()
7072 glog .V (4 ).Infof ("Exported %d data to kafka in %s" , len ([]byte (string (msgJson ))), end .Sub (start ))
@@ -76,33 +78,20 @@ func (sink *kafkaSink) Name() string {
7678}
7779
7880func (sink * kafkaSink ) Stop () {
79- // nothing needs to be done.
81+ sink . producer . Close ()
8082}
8183
8284// setupProducer returns a producer of kafka server
83- func setupProducer (sinkBrokerHosts []string , topic string , brokerConf kafka.BrokerConf , compression proto. Compression ) (kafka.DistributingProducer , error ) {
85+ func setupProducer (sinkBrokerHosts []string , config * kafka.Config ) (kafka.AsyncProducer , error ) {
8486 glog .V (3 ).Infof ("attempting to setup kafka sink" )
85- broker , err := kafka .Dial (sinkBrokerHosts , brokerConf )
86- if err != nil {
87- return nil , fmt .Errorf ("failed to connect to kafka cluster: %s" , err )
88- }
89- defer broker .Close ()
9087
9188 //create kafka producer
92- conf := kafka .NewProducerConf ()
93- conf .RequiredAcks = proto .RequiredAcksLocal
94- conf .Compression = compression
95- producer := broker .Producer (conf )
96-
97- // create RoundRobinProducer with the default producer.
98- count , err := broker .PartitionCount (topic )
89+ producer , err := kafka .NewAsyncProducer (sinkBrokerHosts , config )
9990 if err != nil {
100- count = 1
101- glog .Warningf ("Failed to get partition count of topic %q: %s" , topic , err )
91+ return nil , err
10292 }
103- sinkProducer := kafka .NewRoundRobinProducer (producer , count )
10493 glog .V (3 ).Infof ("kafka sink setup successfully" )
105- return sinkProducer , nil
94+ return producer , nil
10695}
10796
10897func getTopic (opts map [string ][]string , topicType string ) (string , error ) {
@@ -123,27 +112,81 @@ func getTopic(opts map[string][]string, topicType string) (string, error) {
123112 return topic , nil
124113}
125114
126- func getCompression (opts map [ string ][] string ) (proto. Compression , error ) {
127- if len (opts [compression ]) == 0 {
128- return proto .CompressionNone , nil
115+ func getCompression (opts url. Values ) (kafka. CompressionCodec , error ) {
116+ if len (opts [" compression" ]) == 0 {
117+ return kafka .CompressionNone , nil
129118 }
130- comp := opts [compression ][0 ]
119+ comp := opts [" compression" ][0 ]
131120 switch comp {
132121 case "none" :
133- return proto .CompressionNone , nil
122+ return kafka .CompressionNone , nil
134123 case "gzip" :
135- return proto .CompressionGzip , nil
124+ return kafka .CompressionGZIP , nil
125+ case "snappy" :
126+ return kafka .CompressionSnappy , nil
127+ case "lz4" :
128+ return kafka .CompressionLZ4 , nil
136129 default :
137- return proto .CompressionNone , fmt .Errorf ("Compression '%s' is illegal. Use none or gzip" , comp )
130+ return kafka .CompressionNone , fmt .Errorf ("Compression '%s' is illegal. Use none or gzip" , comp )
131+ }
132+ }
133+
134+ func getTlsConfiguration (opts url.Values ) (* tls.Config , error ) {
135+ if len (opts ["cacert" ]) == 0 &&
136+ (len (opts ["cert" ]) == 0 || len (opts ["key" ]) == 0 ) {
137+ return nil , nil
138+ }
139+ t := & tls.Config {}
140+ if len (opts ["cacert" ]) != 0 {
141+ caFile := opts ["cacert" ][0 ]
142+ caCert , err := ioutil .ReadFile (caFile )
143+ if err != nil {
144+ log .Fatal (err )
145+ }
146+ caCertPool := x509 .NewCertPool ()
147+ caCertPool .AppendCertsFromPEM (caCert )
148+ t .RootCAs = caCertPool
149+ }
150+
151+ if len (opts ["cert" ]) != 0 && len (opts ["key" ]) != 0 {
152+ certFile := opts ["cert" ][0 ]
153+ keyFile := opts ["key" ][0 ]
154+ cert , err := tls .LoadX509KeyPair (certFile , keyFile )
155+ if err != nil {
156+ return nil , err
157+ }
158+ t .Certificates = []tls.Certificate {cert }
159+ }
160+ if len (opts ["insecuressl" ]) != 0 {
161+ insecuressl := opts ["insecuressl" ][0 ]
162+ insecure , err := strconv .ParseBool (insecuressl )
163+ if err != nil {
164+ return nil , err
165+ }
166+ t .InsecureSkipVerify = insecure
167+ }
168+
169+ return t , nil
170+ }
171+
172+ func getSASLConfiguration (opts url.Values ) (string , string , error ) {
173+ if len (opts ["user" ]) == 0 {
174+ return "" , "" , nil
138175 }
176+ user := opts ["user" ][0 ]
177+ if len (opts ["password" ]) == 0 {
178+ return "" , "" , nil
179+ }
180+ password := opts ["password" ][0 ]
181+ return user , password , nil
139182}
140183
141184func NewKafkaClient (uri * url.URL , topicType string ) (KafkaClient , error ) {
142185 opts , err := url .ParseQuery (uri .RawQuery )
143186 if err != nil {
144187 return nil , fmt .Errorf ("failed to parse url's query string: %s" , err )
145188 }
146- glog .V (3 ).Infof ("kafka sink option: %v" , opts )
189+ glog .V (6 ).Infof ("kafka sink option: %v" , opts )
147190
148191 topic , err := getTopic (opts , topicType )
149192 if err != nil {
@@ -155,25 +198,49 @@ func NewKafkaClient(uri *url.URL, topicType string) (KafkaClient, error) {
155198 return nil , err
156199 }
157200
201+ tlsConfig , err := getTlsConfiguration (opts )
202+ if err != nil {
203+ return nil , err
204+ }
205+
206+ saslUser , saslPassword , err := getSASLConfiguration (opts )
207+ if err != nil {
208+ return nil , err
209+ }
210+
158211 var kafkaBrokers []string
159212 if len (opts ["brokers" ]) < 1 {
160213 return nil , fmt .Errorf ("There is no broker assigned for connecting kafka" )
161214 }
162215 kafkaBrokers = append (kafkaBrokers , opts ["brokers" ]... )
163216 glog .V (2 ).Infof ("initializing kafka sink with brokers - %v" , kafkaBrokers )
164217
218+ kafka .Logger = GologAdapterLogger {}
219+
165220 //structure the config of broker
166- brokerConf := kafka .NewBrokerConf (brokerClientID )
167- brokerConf .DialTimeout = brokerDialTimeout
168- brokerConf .DialRetryLimit = brokerDialRetryLimit
169- brokerConf .DialRetryWait = brokerDialRetryWait
170- brokerConf .LeaderRetryLimit = brokerLeaderRetryLimit
171- brokerConf .LeaderRetryWait = brokerLeaderRetryWait
172- brokerConf .AllowTopicCreation = brokerAllowTopicCreation
173- brokerConf .Logger = & GologAdapterLogger {}
221+ config := kafka .NewConfig ()
222+ config .ClientID = brokerClientID
223+ config .Net .DialTimeout = brokerDialTimeout
224+ config .Metadata .Retry .Max = brokerDialRetryLimit
225+ config .Metadata .Retry .Backoff = brokerDialRetryWait
226+ config .Producer .Retry .Max = brokerLeaderRetryLimit
227+ config .Producer .Retry .Backoff = brokerLeaderRetryWait
228+ config .Producer .Compression = compression
229+ config .Producer .Partitioner = kafka .NewRoundRobinPartitioner
230+ config .Producer .RequiredAcks = kafka .WaitForLocal
231+
232+ if tlsConfig != nil {
233+ config .Net .TLS .Config = tlsConfig
234+ config .Net .TLS .Enable = true
235+ }
236+ if saslUser != "" && saslPassword != "" {
237+ config .Net .SASL .Enable = true
238+ config .Net .SASL .User = saslUser
239+ config .Net .SASL .Password = saslPassword
240+ }
174241
175242 // set up producer of kafka server.
176- sinkProducer , err := setupProducer (kafkaBrokers , topic , brokerConf , compression )
243+ sinkProducer , err := setupProducer (kafkaBrokers , config )
177244 if err != nil {
178245 return nil , fmt .Errorf ("Failed to setup Producer: - %v" , err )
179246 }
0 commit comments