1515package kafka
1616
1717import (
18+ "crypto/tls"
19+ "crypto/x509"
1820 "encoding/json"
1921 "fmt"
22+ "io/ioutil"
2023 "net/url"
24+ "strconv"
2125 "time"
2226
27+ kafka "github.com/Shopify/sarama"
2328 "github.com/golang/glog"
24- "github.com/optiopay/kafka"
25- "github.com/optiopay/kafka/proto"
2629)
2730
2831const (
29- brokerClientID = "kafka-sink"
30- brokerDialTimeout = 10 * time .Second
31- brokerDialRetryLimit = 1
32- brokerDialRetryWait = 0
33- brokerAllowTopicCreation = true
34- brokerLeaderRetryLimit = 1
35- brokerLeaderRetryWait = 0
36- metricsTopic = "heapster-metrics"
37- eventsTopic = "heapster-events"
32+ brokerClientID = "kafka-sink"
33+ brokerDialTimeout = 10 * time .Second
34+ brokerDialRetryLimit = 1
35+ brokerDialRetryWait = 0
36+ brokerLeaderRetryLimit = 1
37+ brokerLeaderRetryWait = 0
38+ metricsTopic = "heapster-metrics"
39+ eventsTopic = "heapster-events"
3840)
3941
4042const (
4143 TimeSeriesTopic = "timeseriestopic"
4244 EventsTopic = "eventstopic"
43- compression = "compression"
4445)
4546
4647type KafkaClient interface {
@@ -50,7 +51,7 @@ type KafkaClient interface {
5051}
5152
5253type kafkaSink struct {
53- producer kafka.DistributingProducer
54+ producer kafka.SyncProducer
5455 dataTopic string
5556}
5657
@@ -61,13 +62,16 @@ func (sink *kafkaSink) ProduceKafkaMessage(msgData interface{}) error {
6162 return fmt .Errorf ("failed to transform the items to json : %s" , err )
6263 }
6364
64- message := & proto.Message {Value : []byte (string (msgJson ))}
65- _ , err = sink .producer .Distribute (sink .dataTopic , message )
65+ _ , _ , err = sink .producer .SendMessage (& kafka.ProducerMessage {
66+ Topic : sink .dataTopic ,
67+ Key : nil ,
68+ Value : kafka .ByteEncoder (msgJson ),
69+ })
6670 if err != nil {
6771 return fmt .Errorf ("failed to produce message to %s: %s" , sink .dataTopic , err )
6872 }
6973 end := time .Now ()
70- glog .V (4 ).Infof ("Exported %d data to kafka in %s" , len ([] byte ( string ( msgJson )) ), end .Sub (start ))
74+ glog .V (4 ).Infof ("Exported %d data to kafka in %s" , len (msgJson ), end .Sub (start ))
7175 return nil
7276}
7377
@@ -76,33 +80,7 @@ func (sink *kafkaSink) Name() string {
7680}
7781
7882func (sink * kafkaSink ) Stop () {
79- // nothing needs to be done.
80- }
81-
82- // setupProducer returns a producer of kafka server
83- func setupProducer (sinkBrokerHosts []string , topic string , brokerConf kafka.BrokerConf , compression proto.Compression ) (kafka.DistributingProducer , error ) {
84- glog .V (3 ).Infof ("attempting to setup kafka sink" )
85- broker , err := kafka .Dial (sinkBrokerHosts , brokerConf )
86- if err != nil {
87- return nil , fmt .Errorf ("failed to connect to kafka cluster: %s" , err )
88- }
89- defer broker .Close ()
90-
91- //create kafka producer
92- conf := kafka .NewProducerConf ()
93- conf .RequiredAcks = proto .RequiredAcksLocal
94- conf .Compression = compression
95- producer := broker .Producer (conf )
96-
97- // create RoundRobinProducer with the default producer.
98- count , err := broker .PartitionCount (topic )
99- if err != nil {
100- count = 1
101- glog .Warningf ("Failed to get partition count of topic %q: %s" , topic , err )
102- }
103- sinkProducer := kafka .NewRoundRobinProducer (producer , count )
104- glog .V (3 ).Infof ("kafka sink setup successfully" )
105- return sinkProducer , nil
83+ sink .producer .Close ()
10684}
10785
10886func getTopic (opts map [string ][]string , topicType string ) (string , error ) {
@@ -123,27 +101,93 @@ func getTopic(opts map[string][]string, topicType string) (string, error) {
123101 return topic , nil
124102}
125103
126- func getCompression (opts map [ string ][] string ) (proto. Compression , error ) {
127- if len (opts [compression ]) == 0 {
128- return proto .CompressionNone , nil
104+ func getCompression (opts url. Values ) (kafka. CompressionCodec , error ) {
105+ if len (opts [" compression" ]) == 0 {
106+ return kafka .CompressionNone , nil
129107 }
130- comp := opts [compression ][0 ]
108+ comp := opts [" compression" ][0 ]
131109 switch comp {
132110 case "none" :
133- return proto .CompressionNone , nil
111+ return kafka .CompressionNone , nil
134112 case "gzip" :
135- return proto .CompressionGzip , nil
113+ return kafka .CompressionGZIP , nil
114+ case "snappy" :
115+ return kafka .CompressionSnappy , nil
116+ case "lz4" :
117+ return kafka .CompressionLZ4 , nil
136118 default :
137- return proto .CompressionNone , fmt .Errorf ("Compression '%s' is illegal. Use none or gzip" , comp )
119+ return kafka .CompressionNone , fmt .Errorf ("Compression '%s' is illegal. Use none, snappy, lz4 or gzip" , comp )
120+ }
121+ }
122+
123+ func getTlsConfiguration (opts url.Values ) (* tls.Config , bool , error ) {
124+ if len (opts ["cacert" ]) == 0 &&
125+ (len (opts ["cert" ]) == 0 || len (opts ["key" ]) == 0 ) {
126+ return nil , false , nil
127+ }
128+ t := & tls.Config {}
129+ if len (opts ["cacert" ]) != 0 {
130+ caFile := opts ["cacert" ][0 ]
131+ caCert , err := ioutil .ReadFile (caFile )
132+ if err != nil {
133+ return nil , false , err
134+ }
135+ caCertPool := x509 .NewCertPool ()
136+ caCertPool .AppendCertsFromPEM (caCert )
137+ t .RootCAs = caCertPool
138138 }
139+
140+ if len (opts ["cert" ]) != 0 && len (opts ["key" ]) != 0 {
141+ certFile := opts ["cert" ][0 ]
142+ keyFile := opts ["key" ][0 ]
143+ cert , err := tls .LoadX509KeyPair (certFile , keyFile )
144+ if err != nil {
145+ return nil , false , err
146+ }
147+ t .Certificates = []tls.Certificate {cert }
148+ }
149+ if len (opts ["insecuressl" ]) != 0 {
150+ insecuressl := opts ["insecuressl" ][0 ]
151+ insecure , err := strconv .ParseBool (insecuressl )
152+ if err != nil {
153+ return nil , false , err
154+ }
155+ t .InsecureSkipVerify = insecure
156+ }
157+
158+ return t , true , nil
159+ }
160+
161+ func getSASLConfiguration (opts url.Values ) (string , string , bool , error ) {
162+ if len (opts ["user" ]) == 0 {
163+ return "" , "" , false , nil
164+ }
165+ user := opts ["user" ][0 ]
166+ if len (opts ["password" ]) == 0 {
167+ return "" , "" , false , nil
168+ }
169+ password := opts ["password" ][0 ]
170+ return user , password , true , nil
171+ }
172+
173+ // usageKafkaclient return sink options with *** for password option.
174+ func usageKafkaclient (values url.Values ) string {
175+ var password []string
176+ if len (values ["password" ]) != 0 {
177+ password = values ["password" ]
178+ values ["password" ] = []string {"***" }
179+ defer func () { values ["password" ] = password }()
180+ }
181+ options := fmt .Sprintf ("kafka sink option: %v" , values )
182+ return options
139183}
140184
141185func NewKafkaClient (uri * url.URL , topicType string ) (KafkaClient , error ) {
142186 opts , err := url .ParseQuery (uri .RawQuery )
143187 if err != nil {
144188 return nil , fmt .Errorf ("failed to parse url's query string: %s" , err )
145189 }
146- glog .V (3 ).Infof ( "kafka sink option: %v" , opts )
190+ glog .V (3 ).Info ( usageKafkaclient ( opts ) )
147191
148192 topic , err := getTopic (opts , topicType )
149193 if err != nil {
@@ -162,22 +206,40 @@ func NewKafkaClient(uri *url.URL, topicType string) (KafkaClient, error) {
162206 kafkaBrokers = append (kafkaBrokers , opts ["brokers" ]... )
163207 glog .V (2 ).Infof ("initializing kafka sink with brokers - %v" , kafkaBrokers )
164208
209+ kafka .Logger = GologAdapterLogger {}
210+
165211 //structure the config of broker
166- brokerConf := kafka .NewBrokerConf (brokerClientID )
167- brokerConf .DialTimeout = brokerDialTimeout
168- brokerConf .DialRetryLimit = brokerDialRetryLimit
169- brokerConf .DialRetryWait = brokerDialRetryWait
170- brokerConf .LeaderRetryLimit = brokerLeaderRetryLimit
171- brokerConf .LeaderRetryWait = brokerLeaderRetryWait
172- brokerConf .AllowTopicCreation = brokerAllowTopicCreation
173- brokerConf .Logger = & GologAdapterLogger {}
212+ config := kafka .NewConfig ()
213+ config .ClientID = brokerClientID
214+ config .Net .DialTimeout = brokerDialTimeout
215+ config .Metadata .Retry .Max = brokerDialRetryLimit
216+ config .Metadata .Retry .Backoff = brokerDialRetryWait
217+ config .Producer .Retry .Max = brokerLeaderRetryLimit
218+ config .Producer .Retry .Backoff = brokerLeaderRetryWait
219+ config .Producer .Compression = compression
220+ config .Producer .Partitioner = kafka .NewRoundRobinPartitioner
221+ config .Producer .RequiredAcks = kafka .WaitForLocal
222+ config .Producer .Return .Errors = true
223+ config .Producer .Return .Successes = true
224+
225+ config .Net .TLS .Config , config .Net .TLS .Enable , err = getTlsConfiguration (opts )
226+ if err != nil {
227+ return nil , err
228+ }
229+
230+ config .Net .SASL .User , config .Net .SASL .Password , config .Net .SASL .Enable , err = getSASLConfiguration (opts )
231+ if err != nil {
232+ return nil , err
233+ }
174234
175235 // set up producer of kafka server.
176- sinkProducer , err := setupProducer (kafkaBrokers , topic , brokerConf , compression )
236+ glog .V (3 ).Infof ("attempting to setup kafka sink" )
237+ sinkProducer , err := kafka .NewSyncProducer (kafkaBrokers , config )
177238 if err != nil {
178239 return nil , fmt .Errorf ("Failed to setup Producer: - %v" , err )
179240 }
180241
242+ glog .V (3 ).Infof ("kafka sink setup successfully" )
181243 return & kafkaSink {
182244 producer : sinkProducer ,
183245 dataTopic : topic ,
0 commit comments