@@ -20,7 +20,6 @@ import (
2020 "encoding/json"
2121 "fmt"
2222 "io/ioutil"
23- "log"
2423 "net/url"
2524 "strconv"
2625 "time"
@@ -52,7 +51,7 @@ type KafkaClient interface {
5251}
5352
5453type kafkaSink struct {
55- producer kafka.AsyncProducer
54+ producer kafka.SyncProducer
5655 dataTopic string
5756}
5857
@@ -63,10 +62,13 @@ func (sink *kafkaSink) ProduceKafkaMessage(msgData interface{}) error {
6362 return fmt .Errorf ("failed to transform the items to json : %s" , err )
6463 }
6564
66- sink .producer .Input () <- & kafka.ProducerMessage {
65+ _ , _ , err = sink .producer .SendMessage ( & kafka.ProducerMessage {
6766 Topic : sink .dataTopic ,
6867 Key : nil ,
6968 Value : kafka .ByteEncoder ([]byte (string (msgJson ))),
69+ })
70+ if err != nil {
71+ return fmt .Errorf ("failed to produce message to %s: %s" , sink .dataTopic , err )
7072 }
7173 end := time .Now ()
7274 glog .V (4 ).Infof ("Exported %d data to kafka in %s" , len ([]byte (string (msgJson ))), end .Sub (start ))
@@ -81,19 +83,6 @@ func (sink *kafkaSink) Stop() {
8183 sink .producer .Close ()
8284}
8385
84- // setupProducer returns a producer of kafka server
85- func setupProducer (sinkBrokerHosts []string , config * kafka.Config ) (kafka.AsyncProducer , error ) {
86- glog .V (3 ).Infof ("attempting to setup kafka sink" )
87-
88- //create kafka producer
89- producer , err := kafka .NewAsyncProducer (sinkBrokerHosts , config )
90- if err != nil {
91- return nil , err
92- }
93- glog .V (3 ).Infof ("kafka sink setup successfully" )
94- return producer , nil
95- }
96-
9786func getTopic (opts map [string ][]string , topicType string ) (string , error ) {
9887 var topic string
9988 switch topicType {
@@ -127,21 +116,21 @@ func getCompression(opts url.Values) (kafka.CompressionCodec, error) {
127116 case "lz4" :
128117 return kafka .CompressionLZ4 , nil
129118 default :
130- return kafka . CompressionNone , fmt .Errorf ("Compression '%s' is illegal. Use none or gzip" , comp )
119+ return - 1 , fmt .Errorf ("Compression '%s' is illegal. Use none, snappy, lz4 or gzip" , comp )
131120 }
132121}
133122
134- func getTlsConfiguration (opts url.Values ) (* tls.Config , error ) {
123+ func getTlsConfiguration (opts url.Values ) (* tls.Config , bool , error ) {
135124 if len (opts ["cacert" ]) == 0 &&
136125 (len (opts ["cert" ]) == 0 || len (opts ["key" ]) == 0 ) {
137- return nil , nil
126+ return nil , false , nil
138127 }
139128 t := & tls.Config {}
140129 if len (opts ["cacert" ]) != 0 {
141130 caFile := opts ["cacert" ][0 ]
142131 caCert , err := ioutil .ReadFile (caFile )
143132 if err != nil {
144- log . Fatal ( err )
133+ return nil , false , err
145134 }
146135 caCertPool := x509 .NewCertPool ()
147136 caCertPool .AppendCertsFromPEM (caCert )
@@ -153,32 +142,32 @@ func getTlsConfiguration(opts url.Values) (*tls.Config, error) {
153142 keyFile := opts ["key" ][0 ]
154143 cert , err := tls .LoadX509KeyPair (certFile , keyFile )
155144 if err != nil {
156- return nil , err
145+ return nil , false , err
157146 }
158147 t .Certificates = []tls.Certificate {cert }
159148 }
160149 if len (opts ["insecuressl" ]) != 0 {
161150 insecuressl := opts ["insecuressl" ][0 ]
162151 insecure , err := strconv .ParseBool (insecuressl )
163152 if err != nil {
164- return nil , err
153+ return nil , false , err
165154 }
166155 t .InsecureSkipVerify = insecure
167156 }
168157
169- return t , nil
158+ return t , true , nil
170159}
171160
172- func getSASLConfiguration (opts url.Values ) (string , string , error ) {
161+ func getSASLConfiguration (opts url.Values ) (string , string , bool , error ) {
173162 if len (opts ["user" ]) == 0 {
174- return "" , "" , nil
163+ return "" , "" , false , nil
175164 }
176165 user := opts ["user" ][0 ]
177166 if len (opts ["password" ]) == 0 {
178- return "" , "" , nil
167+ return "" , "" , false , nil
179168 }
180169 password := opts ["password" ][0 ]
181- return user , password , nil
170+ return user , password , true , nil
182171}
183172
184173func NewKafkaClient (uri * url.URL , topicType string ) (KafkaClient , error ) {
@@ -198,16 +187,6 @@ func NewKafkaClient(uri *url.URL, topicType string) (KafkaClient, error) {
198187 return nil , err
199188 }
200189
201- tlsConfig , err := getTlsConfiguration (opts )
202- if err != nil {
203- return nil , err
204- }
205-
206- saslUser , saslPassword , err := getSASLConfiguration (opts )
207- if err != nil {
208- return nil , err
209- }
210-
211190 var kafkaBrokers []string
212191 if len (opts ["brokers" ]) < 1 {
213192 return nil , fmt .Errorf ("There is no broker assigned for connecting kafka" )
@@ -228,23 +207,27 @@ func NewKafkaClient(uri *url.URL, topicType string) (KafkaClient, error) {
228207 config .Producer .Compression = compression
229208 config .Producer .Partitioner = kafka .NewRoundRobinPartitioner
230209 config .Producer .RequiredAcks = kafka .WaitForLocal
210+ config .Producer .Return .Errors = true
211+ config .Producer .Return .Successes = true
231212
232- if tlsConfig != nil {
233- config . Net . TLS . Config = tlsConfig
234- config . Net . TLS . Enable = true
213+ config . Net . TLS . Config , config . Net . TLS . Enable , err = getTlsConfiguration ( opts )
214+ if err != nil {
215+ return nil , err
235216 }
236- if saslUser != "" && saslPassword != "" {
237- config .Net .SASL .Enable = true
238- config . Net . SASL . User = saslUser
239- config . Net . SASL . Password = saslPassword
217+
218+ config .Net .SASL .User , config . Net . SASL . Password , config . Net . SASL . Enable , err = getSASLConfiguration ( opts )
219+ if err != nil {
220+ return nil , err
240221 }
241222
242223 // set up producer of kafka server.
243- sinkProducer , err := setupProducer (kafkaBrokers , config )
224+ glog .V (3 ).Infof ("attempting to setup kafka sink" )
225+ sinkProducer , err := kafka .NewSyncProducer (kafkaBrokers , config )
244226 if err != nil {
245227 return nil , fmt .Errorf ("Failed to setup Producer: - %v" , err )
246228 }
247229
230+ glog .V (3 ).Infof ("kafka sink setup successfully" )
248231 return & kafkaSink {
249232 producer : sinkProducer ,
250233 dataTopic : topic ,
0 commit comments