1+ local log = require (" log" )
12local ffi = require (' ffi' )
2- local box = require (' box' )
33local fiber = require (' fiber' )
44local librdkafka = require (' kafka.librdkafka' )
55
66local ConsumerConfig = {}
77
88ConsumerConfig .__index = ConsumerConfig
99
10- function ConsumerConfig .create (brokers_list , consumer_group , enable_auto_commit )
11- assert (brokers_list ~= nil )
12- assert (consumer_group ~= nil )
13- assert (enable_auto_commit ~= nil )
10+ function ConsumerConfig .create (brokers_list , consumer_group , enable_auto_commit , default_topic_opts )
11+ if brokers_list == nil then
12+ return nil , " brokers list must not be nil"
13+ end
14+ if consumer_group == nil then
15+ return nil , " consumer group must not be nil"
16+ end
17+ if enable_auto_commit == nil then
18+ return nil , " enable_auto_commit flag must not be nil"
19+ end
20+
21+ if default_topic_opts == nil then
22+ return nil , " default_topic_opts must not be nil"
23+ end
1424
1525 local config = {
1626 _brokers_list = brokers_list ,
1727 _consumer_group = consumer_group ,
1828 _enable_auto_commit = enable_auto_commit ,
1929 _options = {},
30+ _topic_opts = default_topic_opts ,
2031 }
2132 setmetatable (config , ConsumerConfig )
22- return config
33+ return config , nil
2334end
2435
2536function ConsumerConfig :get_brokers_list ()
@@ -42,6 +53,10 @@ function ConsumerConfig:get_options()
4253 return self ._options
4354end
4455
56+ function ConsumerConfig :get_default_topic_options ()
57+ return self ._topic_opts
58+ end
59+
4560local ConsumerMessage = {}
4661
4762ConsumerMessage .__index = ConsumerMessage
@@ -94,23 +109,26 @@ local Consumer = {}
94109Consumer .__index = Consumer
95110
96111function Consumer .create (config )
97- assert (config ~= nil )
112+ if config == nil then
113+ return nil , " config must not be nil"
114+ end
98115
99116 local consumer = {
100117 config = config ,
101118 _rd_consumer = {},
102119 _output_ch = nil ,
103120 }
104121 setmetatable (consumer , Consumer )
105- return consumer
122+ return consumer , nil
106123end
107124
108125function Consumer :_get_topic_rd_config (config )
109126 local rd_config = librdkafka .rd_kafka_topic_conf_new ()
110127
111- ffi .gc (rd_config , function (rd_config )
112- librdkafka .rd_kafka_topic_conf_destroy (rd_config )
113- end )
128+ -- FIXME: sometimes got segfault here
129+ -- ffi.gc(rd_config, function (rd_config)
130+ -- librdkafka.rd_kafka_topic_conf_destroy(rd_config)
131+ -- end)
114132
115133 local ERRLEN = 256
116134 for key , value in pairs (config ) do
127145function Consumer :_get_consumer_rd_config ()
128146 local rd_config = librdkafka .rd_kafka_conf_new ()
129147
130- -- FIXME: почему мы здесь получаем segfault, а в продьюсере с таким же кодом все ок ?
148+ -- FIXME: why we got segfault here ?
131149-- ffi.gc(rd_config, function (rd_config)
132150-- librdkafka.rd_kafka_conf_destroy(rd_config)
133151-- end)
@@ -158,23 +176,18 @@ function Consumer:_get_consumer_rd_config()
158176 end
159177 end
160178
161- librdkafka .rd_kafka_conf_set_consume_cb (rd_config ,
162- function (rkmessage )
163- self ._output_ch :put (ConsumerMessage .create (rkmessage ))
164- end )
165-
166179 librdkafka .rd_kafka_conf_set_error_cb (rd_config ,
167180 function (rk , err , reason )
168- print ( " error" , tonumber (err ), ffi .string (reason ))
181+ log . error ( " rdkafka error code=%d reason=%s " , tonumber (err ), ffi .string (reason ))
169182 end )
170183
171184
172185 librdkafka .rd_kafka_conf_set_log_cb (rd_config ,
173186 function (rk , level , fac , buf )
174- print ( " log " , tonumber ( level ) , ffi .string (fac ), ffi .string (buf ))
187+ log . info ( " %s - %s " , ffi .string (fac ), ffi .string (buf ))
175188 end )
176189
177- local rd_topic_config , err = self :_get_topic_rd_config ({[ " auto.offset.reset " ] = " earliest " } )
190+ local rd_topic_config , err = self :_get_topic_rd_config (self . config : get_default_topic_options () )
178191 if err ~= nil then
179192 return nil , err
180193 end
@@ -186,14 +199,18 @@ end
186199
187200function Consumer :_poll ()
188201 while true do
189- librdkafka .rd_kafka_poll (self ._rd_consumer , 1 )
190- local rd_message = librdkafka .rd_kafka_consumer_poll (self ._rd_consumer , 1 )
191- if rd_message ~= nil and rd_message .err ~= librdkafka .RD_KAFKA_RESP_ERR_NO_ERROR then
192- -- FIXME: properly log this
193- print (ffi .string (librdkafka .rd_kafka_err2str (rd_message .err )))
202+ -- lower timeout value can lead to broken payload
203+ librdkafka .rd_kafka_poll (self ._rd_consumer , 10 )
204+ local rd_message = librdkafka .rd_kafka_consumer_poll (self ._rd_consumer , 10 )
205+ if rd_message ~= nil then
206+ if rd_message .err == librdkafka .RD_KAFKA_RESP_ERR_NO_ERROR then
207+ self ._output_ch :put (ConsumerMessage .create (rd_message ))
208+ else
209+ -- FIXME: properly log this
210+ log .error (" rdkafka poll: %s" , ffi .string (librdkafka .rd_kafka_err2str (rd_message .err )))
211+ end
194212 end
195-
196- fiber .yield ()
213+ fiber .sleep (0.01 )
197214 end
198215end
199216
@@ -212,12 +229,6 @@ function Consumer:start()
212229 return ffi .string (errbuf )
213230 end
214231
215- -- redirect all events polling to rd_kafka_consumer_poll function
216- local err = librdkafka .rd_kafka_poll_set_consumer (rd_consumer )
217- if err ~= librdkafka .RD_KAFKA_RESP_ERR_NO_ERROR then
218- return ffi .string (librdkafka .rd_kafka_err2str (err ))
219- end
220-
221232 for _ , broker in ipairs (self .config :get_brokers_list ()) do
222233 if librdkafka .rd_kafka_brokers_add (rd_consumer , broker ) < 1 then
223234 return " no valid brokers specified"
@@ -246,10 +257,15 @@ function Consumer:stop(timeout_ms)
246257 self ._output_ch :close ()
247258
248259 -- FIXME: handle this error
249- local err = librdkafka .rd_kafka_consumer_close (self ._rd_consumer )
260+ local err_no = librdkafka .rd_kafka_consumer_close (self ._rd_consumer )
261+ if err_no ~= librdkafka .RD_KAFKA_RESP_ERR_NO_ERROR then
262+ return ffi .string (librdkafka .rd_kafka_err2str (err_no ))
263+ end
264+
265+ -- FIXME: sometimes rd_kafka_destroy hangs forever(
266+ -- librdkafka.rd_kafka_destroy(self._rd_consumer)
267+ -- librdkafka.rd_kafka_wait_destroyed(timeout_ms)
250268
251- librdkafka .rd_kafka_destroy (self ._rd_consumer )
252- librdkafka .rd_kafka_wait_destroyed (timeout_ms )
253269 self ._rd_consumer = nil
254270
255271 return nil
@@ -271,13 +287,14 @@ function Consumer:subscribe(topics)
271287 err = ffi .string (librdkafka .rd_kafka_err2str (err_no ))
272288 end
273289
290+
274291 librdkafka .rd_kafka_topic_partition_list_destroy (list )
275292
276293 return err
277294end
278295
279296function Consumer :output ()
280- if self ._rd_consumer == nil then
297+ if self ._output_ch == nil then
281298 return nil , " 'output' method must be called only after consumer was started "
282299 end
283300
0 commit comments