Skip to content

Commit 505396c

Browse files
author
and.sergeev
committed
consumer now working!
1 parent 0cf75a8 commit 505396c

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

kafka/consumer.lua

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,25 @@ function Consumer.create(config)
105105
return consumer
106106
end
107107

108+
function Consumer:_get_topic_rd_config(config)
109+
local rd_config = librdkafka.rd_kafka_topic_conf_new()
110+
111+
ffi.gc(rd_config, function (rd_config)
112+
librdkafka.rd_kafka_topic_conf_destroy(rd_config)
113+
end)
114+
115+
local ERRLEN = 256
116+
for key, value in pairs(config) do
117+
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
118+
119+
if librdkafka.rd_kafka_topic_conf_set(rd_config, key, value, errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then
120+
return nil, ffi.string(errbuf)
121+
end
122+
end
123+
124+
return rd_config, nil
125+
end
126+
108127
function Consumer:_get_consumer_rd_config()
109128
local rd_config = librdkafka.rd_kafka_conf_new()
110129

@@ -141,7 +160,6 @@ function Consumer:_get_consumer_rd_config()
141160

142161
librdkafka.rd_kafka_conf_set_consume_cb(rd_config,
143162
function(rkmessage)
144-
print(rkmessage)
145163
self._output_ch:put(ConsumerMessage.create(rkmessage))
146164
end)
147165

@@ -156,14 +174,20 @@ function Consumer:_get_consumer_rd_config()
156174
print("log", tonumber(level), ffi.string(fac), ffi.string(buf))
157175
end)
158176

177+
local rd_topic_config, err = self:_get_topic_rd_config({["auto.offset.reset"] = "earliest"})
178+
if err ~= nil then
179+
return nil, err
180+
end
181+
182+
librdkafka.rd_kafka_conf_set_default_topic_conf(rd_config, rd_topic_config)
183+
159184
return rd_config, nil
160185
end
161186

162187
function Consumer:_poll()
163188
while true do
164-
librdkafka.rd_kafka_poll(self._rd_consumer, 10)
165-
local rd_message = librdkafka.rd_kafka_consumer_poll(self._rd_consumer, 1000)
166-
print(rd_message)
189+
librdkafka.rd_kafka_poll(self._rd_consumer, 1)
190+
local rd_message = librdkafka.rd_kafka_consumer_poll(self._rd_consumer, 1)
167191
if rd_message ~= nil and rd_message.err ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
168192
-- FIXME: properly log this
169193
print(ffi.string(librdkafka.rd_kafka_err2str(rd_message.err)))
@@ -238,9 +262,7 @@ function Consumer:subscribe(topics)
238262

239263
local list = librdkafka.rd_kafka_topic_partition_list_new(#topics)
240264
for _, topic in ipairs(topics) do
241-
print(topic, librdkafka.RD_KAFKA_PARTITION_UA)
242-
-- librdkafka.rd_kafka_topic_partition_list_add(list, topic, librdkafka.RD_KAFKA_PARTITION_UA)
243-
librdkafka.rd_kafka_topic_partition_list_add(list, topic, 0)
265+
librdkafka.rd_kafka_topic_partition_list_add(list, topic, librdkafka.RD_KAFKA_PARTITION_UA)
244266
end
245267

246268
local err = nil

kafka/librdkafka.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ ffi.cdef[[
8686

8787
rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
8888

89+
void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf);
90+
8991
typedef struct rd_kafka_topic_partition_s {
9092
char *topic; /**< Topic name */
9193
int32_t partition; /**< Partition */

tests/consumer.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ box.cfg{}
88
local BROKERS_ADDRESS = { "kafka" }
99
local TOPIC_NAME = "test_producer"
1010

11-
local config = kafka_consumer.ConsumerConfig.create(BROKERS_ADDRESS, "test_consumer5", false)
11+
local config = kafka_consumer.ConsumerConfig.create(BROKERS_ADDRESS, "test_consumer6", false)
1212

1313
local consumer = kafka_consumer.Consumer.create(config)
1414

@@ -36,7 +36,7 @@ for i = 0, 1 do
3636
local msg = out:get()
3737
if msg ~= nil then
3838
print(string.format("got msg with topic='%s' partition='%s' offset='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:value()))
39-
local err = consumer:commit(msg)
39+
local err = consumer:commit_async(msg)
4040
if err ~= nil then
4141
print(string.format("got error '%s' while commiting msg from topic '%s'", err, msg:topic()))
4242
end

0 commit comments

Comments
 (0)