Skip to content

Commit 0cf75a8

Browse files
author
and.sergeev
committed
freeze not working consumer based on new consumer api of rdlibkafka
1 parent 524eabe commit 0cf75a8

File tree

3 files changed

+88
-14
lines changed

3 files changed

+88
-14
lines changed

kafka/consumer.lua

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ local ConsumerConfig = {}
77

88
ConsumerConfig.__index = ConsumerConfig
99

10-
function ConsumerConfig.create(brokers_list)
10+
function ConsumerConfig.create(brokers_list, consumer_group, enable_auto_commit)
1111
assert(brokers_list ~= nil)
12+
assert(consumer_group ~= nil)
13+
assert(enable_auto_commit ~= nil)
1214

1315
local config = {
1416
_brokers_list = brokers_list,
17+
_consumer_group = consumer_group,
18+
_enable_auto_commit = enable_auto_commit,
1519
_options = {},
1620
}
1721
setmetatable(config, ConsumerConfig)
@@ -22,6 +26,14 @@ function ConsumerConfig:get_brokers_list()
2226
return self._brokers_list
2327
end
2428

29+
function ConsumerConfig:get_consumer_group()
30+
return self._consumer_group
31+
end
32+
33+
function ConsumerConfig:get_enable_auto_commit()
34+
return self._enable_auto_commit
35+
end
36+
2537
function ConsumerConfig:set_option(name, value)
2638
self._options[name] = value
2739
end
@@ -42,7 +54,7 @@ function ConsumerMessage.create(rd_message)
4254
_partition = nil,
4355
_offset = nil,
4456
}
45-
ffi.gc(msg, function(...)
57+
ffi.gc(msg._rd_message, function(...)
4658
librdkafka.rd_kafka_message_destroy(...)
4759
end)
4860
setmetatable(msg, ConsumerMessage)
@@ -102,24 +114,62 @@ function Consumer:_get_consumer_rd_config()
102114
-- end)
103115

104116
local ERRLEN = 256
117+
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
118+
if librdkafka.rd_kafka_conf_set(rd_config, "group.id", tostring(self.config:get_consumer_group()), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then
119+
return nil, ffi.string(errbuf)
120+
end
121+
122+
local enable_auto_commit
123+
if self.config:get_enable_auto_commit() then
124+
enable_auto_commit = "true"
125+
else
126+
enable_auto_commit = "false"
127+
end
128+
129+
local ERRLEN = 256
130+
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
131+
if librdkafka.rd_kafka_conf_set(rd_config, "enable.auto.commit", enable_auto_commit, errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then
132+
return nil, ffi.string(errbuf)
133+
end
134+
105135
for key, value in pairs(self.config:get_options()) do
106136
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
107137
if librdkafka.rd_kafka_conf_set(rd_config, key, tostring(value), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then
108138
return nil, ffi.string(errbuf)
109139
end
110140
end
111141

142+
librdkafka.rd_kafka_conf_set_consume_cb(rd_config,
143+
function(rkmessage)
144+
print(rkmessage)
145+
self._output_ch:put(ConsumerMessage.create(rkmessage))
146+
end)
147+
148+
librdkafka.rd_kafka_conf_set_error_cb(rd_config,
149+
function(rk, err, reason)
150+
print("error", tonumber(err), ffi.string(reason))
151+
end)
152+
153+
154+
librdkafka.rd_kafka_conf_set_log_cb(rd_config,
155+
function(rk, level, fac, buf)
156+
print("log", tonumber(level), ffi.string(fac), ffi.string(buf))
157+
end)
158+
112159
return rd_config, nil
113160
end
114161

115162
function Consumer:_poll()
116163
while true do
117-
local rd_message = librdkafka.rd_kafka_consumer_poll(self._rd_consumer, 1)
118-
if rd_message.err == librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
119-
self._output_ch:put(ConsumerMessage.create(rd_message))
120-
else
121-
fiber.yield()
164+
librdkafka.rd_kafka_poll(self._rd_consumer, 10)
165+
local rd_message = librdkafka.rd_kafka_consumer_poll(self._rd_consumer, 1000)
166+
print(rd_message)
167+
if rd_message ~= nil and rd_message.err ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
168+
-- FIXME: properly log this
169+
print(ffi.string(librdkafka.rd_kafka_err2str(rd_message.err)))
122170
end
171+
172+
fiber.yield()
123173
end
124174
end
125175

@@ -134,13 +184,20 @@ function Consumer:start()
134184
local ERRLEN = 256
135185
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
136186
local rd_consumer = librdkafka.rd_kafka_new(librdkafka.RD_KAFKA_CONSUMER, rd_config, errbuf, ERRLEN)
137-
138187
if rd_consumer == nil then
139188
return ffi.string(errbuf)
140189
end
141190

191+
-- redirect all events polling to rd_kafka_consumer_poll function
192+
local err = librdkafka.rd_kafka_poll_set_consumer(rd_consumer)
193+
if err ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
194+
return ffi.string(librdkafka.rd_kafka_err2str(err))
195+
end
196+
142197
for _, broker in ipairs(self.config:get_brokers_list()) do
143-
librdkafka.rd_kafka_brokers_add(rd_consumer, broker)
198+
if librdkafka.rd_kafka_brokers_add(rd_consumer, broker) < 1 then
199+
return "no valid brokers specified"
200+
end
144201
end
145202

146203
self._rd_consumer = rd_consumer
@@ -181,6 +238,8 @@ function Consumer:subscribe(topics)
181238

182239
local list = librdkafka.rd_kafka_topic_partition_list_new(#topics)
183240
for _, topic in ipairs(topics) do
241+
print(topic, librdkafka.RD_KAFKA_PARTITION_UA)
242+
-- librdkafka.rd_kafka_topic_partition_list_add(list, topic, librdkafka.RD_KAFKA_PARTITION_UA)
184243
librdkafka.rd_kafka_topic_partition_list_add(list, topic, 0)
185244
end
186245

kafka/librdkafka.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
local ffi = require 'ffi'
22

33
ffi.cdef[[
4+
static const int32_t RD_KAFKA_PARTITION_UA = ((int32_t)-1);
5+
46
typedef struct rd_kafka_s rd_kafka_t;
57
typedef struct rd_kafka_conf_s rd_kafka_conf_t;
68
typedef struct rd_kafka_topic_s rd_kafka_topic_t;
@@ -46,6 +48,8 @@ ffi.cdef[[
4648
char *errstr, size_t errstr_size);
4749
void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf, void (*dr_msg_cb) (rd_kafka_t *rk,
4850
const rd_kafka_message_t *rkmessage, void *opaque));
51+
void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf, void (*consume_cb) (rd_kafka_message_t *rkmessage,
52+
void *opaque));
4953
void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf, void (*error_cb) (rd_kafka_t *rk, int err,
5054
const char *reason, void *opaque));
5155
void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf, int (*stats_cb) (rd_kafka_t *rk, char *json,
@@ -115,6 +119,8 @@ ffi.cdef[[
115119

116120
rd_kafka_resp_err_t rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async);
117121
rd_kafka_resp_err_t rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async);
122+
123+
rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk);
118124
]]
119125

120126
local librdkafka = ffi.load("librdkafka.so.1")

tests/consumer.lua

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
11
local box = require('box')
2+
local os = require("os")
23
local fiber = require('fiber')
34
local kafka_consumer = require('kafka.consumer')
45

56
box.cfg{}
67

78
local BROKERS_ADDRESS = { "kafka" }
8-
local TOPIC_NAME = "test_consumer"
9+
local TOPIC_NAME = "test_producer"
910

10-
local config = kafka_consumer.ConsumerConfig.create(BROKERS_ADDRESS)
11+
local config = kafka_consumer.ConsumerConfig.create(BROKERS_ADDRESS, "test_consumer5", false)
1112

1213
local consumer = kafka_consumer.Consumer.create(config)
1314

14-
consumer:start()
15+
local err = consumer:start()
16+
if err ~= nil then
17+
print(err)
18+
os.exit(1)
19+
end
1520

16-
consumer:subscribe({TOPIC_NAME})
21+
local err = consumer:subscribe({TOPIC_NAME})
22+
if err ~= nil then
23+
print(err)
24+
os.exit(1)
25+
end
1726

1827
for i = 0, 1 do
1928
fiber.create(function()
@@ -36,6 +45,6 @@ for i = 0, 1 do
3645
end)
3746
end
3847

39-
fiber.sleep(2)
48+
fiber.sleep(10)
4049

4150
consumer:stop()

0 commit comments

Comments
 (0)