Skip to content

Commit 524eabe

Browse files
author
and.sergeev
committed
added base consumer implementation
1 parent 937e721 commit 524eabe

File tree

5 files changed

+304
-1
lines changed

5 files changed

+304
-1
lines changed

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ ADD . /opt/tarantool
66

77
WORKDIR /opt/tarantool
88

9-
ENTRYPOINT tarantool /opt/tarantool/tests/producer.lua
9+
ENTRYPOINT tarantool /opt/tarantool/tests/consumer.lua

kafka/consumer.lua

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
local ffi = require('ffi')
2+
local box = require('box')
3+
local fiber = require('fiber')
4+
local librdkafka = require('kafka.librdkafka')
5+
6+
local ConsumerConfig = {}
7+
8+
ConsumerConfig.__index = ConsumerConfig
9+
10+
function ConsumerConfig.create(brokers_list)
11+
assert(brokers_list ~= nil)
12+
13+
local config = {
14+
_brokers_list = brokers_list,
15+
_options = {},
16+
}
17+
setmetatable(config, ConsumerConfig)
18+
return config
19+
end
20+
21+
function ConsumerConfig:get_brokers_list()
22+
return self._brokers_list
23+
end
24+
25+
function ConsumerConfig:set_option(name, value)
26+
self._options[name] = value
27+
end
28+
29+
function ConsumerConfig:get_options()
30+
return self._options
31+
end
32+
33+
local ConsumerMessage = {}
34+
35+
ConsumerMessage.__index = ConsumerMessage
36+
37+
function ConsumerMessage.create(rd_message)
38+
local msg = {
39+
_rd_message = rd_message,
40+
_value = nil,
41+
_topic = nil,
42+
_partition = nil,
43+
_offset = nil,
44+
}
45+
ffi.gc(msg, function(...)
46+
librdkafka.rd_kafka_message_destroy(...)
47+
end)
48+
setmetatable(msg, ConsumerMessage)
49+
return msg
50+
end
51+
52+
function ConsumerMessage:value()
53+
if self._value == nil then
54+
self._value = ffi.string(self._rd_message.payload)
55+
end
56+
return self._value
57+
end
58+
59+
function ConsumerMessage:topic()
60+
if self._topic == nil then
61+
self._topic = ffi.string(librdkafka.rd_kafka_topic_name(self._rd_message.rkt))
62+
end
63+
return self._topic
64+
end
65+
66+
function ConsumerMessage:partition()
67+
if self._partition == nil then
68+
self._partition = 1
69+
end
70+
return self._partition
71+
end
72+
73+
function ConsumerMessage:offset()
74+
if self._offset == nil then
75+
self._offset = 1
76+
end
77+
return self._offset
78+
end
79+
80+
local Consumer = {}
81+
82+
Consumer.__index = Consumer
83+
84+
function Consumer.create(config)
85+
assert(config ~= nil)
86+
87+
local consumer = {
88+
config = config,
89+
_rd_consumer = {},
90+
_output_ch = nil,
91+
}
92+
setmetatable(consumer, Consumer)
93+
return consumer
94+
end
95+
96+
function Consumer:_get_consumer_rd_config()
97+
local rd_config = librdkafka.rd_kafka_conf_new()
98+
99+
-- FIXME: почему мы здесь получаем segfault, а в продьюсере с таким же кодом все ок?
100+
-- ffi.gc(rd_config, function (rd_config)
101+
-- librdkafka.rd_kafka_conf_destroy(rd_config)
102+
-- end)
103+
104+
local ERRLEN = 256
105+
for key, value in pairs(self.config:get_options()) do
106+
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
107+
if librdkafka.rd_kafka_conf_set(rd_config, key, tostring(value), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then
108+
return nil, ffi.string(errbuf)
109+
end
110+
end
111+
112+
return rd_config, nil
113+
end
114+
115+
function Consumer:_poll()
116+
while true do
117+
local rd_message = librdkafka.rd_kafka_consumer_poll(self._rd_consumer, 1)
118+
if rd_message.err == librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
119+
self._output_ch:put(ConsumerMessage.create(rd_message))
120+
else
121+
fiber.yield()
122+
end
123+
end
124+
end
125+
126+
jit.off(Consumer._poll)
127+
128+
function Consumer:start()
129+
local rd_config, err = self:_get_consumer_rd_config()
130+
if err ~= nil then
131+
return err
132+
end
133+
134+
local ERRLEN = 256
135+
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
136+
local rd_consumer = librdkafka.rd_kafka_new(librdkafka.RD_KAFKA_CONSUMER, rd_config, errbuf, ERRLEN)
137+
138+
if rd_consumer == nil then
139+
return ffi.string(errbuf)
140+
end
141+
142+
for _, broker in ipairs(self.config:get_brokers_list()) do
143+
librdkafka.rd_kafka_brokers_add(rd_consumer, broker)
144+
end
145+
146+
self._rd_consumer = rd_consumer
147+
148+
self._output_ch = fiber.channel(100)
149+
150+
self._poll_fiber = fiber.create(function()
151+
self:_poll()
152+
end)
153+
end
154+
155+
function Consumer:stop(timeout_ms)
156+
if self._rd_consumer == nil then
157+
return "'stop' method must be called only after consumer was started "
158+
end
159+
160+
if timeout_ms == nil then
161+
timeout_ms = 1000
162+
end
163+
164+
self._poll_fiber:cancel()
165+
self._output_ch:close()
166+
167+
-- FIXME: handle this error
168+
local err = librdkafka.rd_kafka_consumer_close(self._rd_consumer)
169+
170+
librdkafka.rd_kafka_destroy(self._rd_consumer)
171+
librdkafka.rd_kafka_wait_destroyed(timeout_ms)
172+
self._rd_consumer = nil
173+
174+
return nil
175+
end
176+
177+
function Consumer:subscribe(topics)
178+
if self._rd_consumer == nil then
179+
return "'add_topic' method must be called only after consumer was started "
180+
end
181+
182+
local list = librdkafka.rd_kafka_topic_partition_list_new(#topics)
183+
for _, topic in ipairs(topics) do
184+
librdkafka.rd_kafka_topic_partition_list_add(list, topic, 0)
185+
end
186+
187+
local err = nil
188+
local err_no = librdkafka.rd_kafka_subscribe(self._rd_consumer, list)
189+
if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
190+
err = ffi.string(librdkafka.rd_kafka_err2str(err_no))
191+
end
192+
193+
librdkafka.rd_kafka_topic_partition_list_destroy(list)
194+
195+
return err
196+
end
197+
198+
function Consumer:output()
199+
if self._rd_consumer == nil then
200+
return nil, "'output' method must be called only after consumer was started "
201+
end
202+
203+
return self._output_ch, nil
204+
end
205+
206+
function Consumer:commit_async(message)
207+
if self._rd_consumer == nil then
208+
return "'commit' method must be called only after consumer was started "
209+
end
210+
211+
local err_no = librdkafka.rd_kafka_commit_message(self._rd_consumer, message._rd_message, 1)
212+
if err_no ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
213+
return ffi.string(librdkafka.rd_kafka_err2str(err_no))
214+
end
215+
216+
return nil
217+
end
218+
219+
return {
220+
ConsumerConfig = ConsumerConfig,
221+
Consumer = Consumer,
222+
}

kafka/librdkafka.lua

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ ffi.cdef[[
2424
void *_private;
2525
} rd_kafka_message_t;
2626

27+
void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage);
28+
2729
typedef enum rd_kafka_type_t {
2830
RD_KAFKA_PRODUCER,
2931
RD_KAFKA_CONSUMER
@@ -79,6 +81,40 @@ ffi.cdef[[
7981
int rd_kafka_thread_cnt (void);
8082

8183
rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
84+
85+
typedef struct rd_kafka_topic_partition_s {
86+
char *topic; /**< Topic name */
87+
int32_t partition; /**< Partition */
88+
int64_t offset; /**< Offset */
89+
void *metadata; /**< Metadata */
90+
size_t metadata_size; /**< Metadata size */
91+
void *opaque; /**< Application opaque */
92+
rd_kafka_resp_err_t err; /**< Error code, depending on use. */
93+
void *_private; /**< INTERNAL USE ONLY,
94+
* INITIALIZE TO ZERO, DO NOT TOUCH */
95+
} rd_kafka_topic_partition_t;
96+
97+
typedef struct rd_kafka_topic_partition_list_s {
98+
int cnt; /**< Current number of elements */
99+
int size; /**< Current allocated size */
100+
rd_kafka_topic_partition_t *elems; /**< Element array[] */
101+
} rd_kafka_topic_partition_list_t;
102+
103+
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size);
104+
void rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rkparlist);
105+
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add (rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition);
106+
107+
/**
108+
* @remark Only the \c .topic field is used in the supplied \p topics list,
109+
* all other fields are ignored.
110+
*/
111+
rd_kafka_resp_err_t rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics);
112+
113+
rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
114+
rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk);
115+
116+
rd_kafka_resp_err_t rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async);
117+
rd_kafka_resp_err_t rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async);
82118
]]
83119

84120
local librdkafka = ffi.load("librdkafka.so.1")

kafka/producer.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ function Producer:start()
187187
self._poll_fiber = fiber.create(function()
188188
self:_poll()
189189
end)
190+
191+
return nil
190192
end
191193

192194
function Producer:stop(timeout_ms)
@@ -211,6 +213,8 @@ function Producer:stop(timeout_ms)
211213
librdkafka.rd_kafka_destroy(self._rd_producer)
212214
librdkafka.rd_kafka_wait_destroyed(timeout_ms)
213215
self._rd_producer = nil
216+
217+
return nil
214218
end
215219

216220
function Producer:_get_topic_rd_config(config)

tests/consumer.lua

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
local box = require('box')
2+
local fiber = require('fiber')
3+
local kafka_consumer = require('kafka.consumer')
4+
5+
box.cfg{}
6+
7+
local BROKERS_ADDRESS = { "kafka" }
8+
local TOPIC_NAME = "test_consumer"
9+
10+
local config = kafka_consumer.ConsumerConfig.create(BROKERS_ADDRESS)
11+
12+
local consumer = kafka_consumer.Consumer.create(config)
13+
14+
consumer:start()
15+
16+
consumer:subscribe({TOPIC_NAME})
17+
18+
for i = 0, 1 do
19+
fiber.create(function()
20+
while true do
21+
local out, err = consumer:output()
22+
if err ~= nil then
23+
print(string.format("got fatal error '%s'", err))
24+
return
25+
end
26+
27+
local msg = out:get()
28+
if msg ~= nil then
29+
print(string.format("got msg with topic='%s' partition='%s' offset='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:value()))
30+
local err = consumer:commit(msg)
31+
if err ~= nil then
32+
print(string.format("got error '%s' while commiting msg from topic '%s'", err, msg:topic()))
33+
end
34+
end
35+
end
36+
end)
37+
end
38+
39+
fiber.sleep(2)
40+
41+
consumer:stop()

0 commit comments

Comments
 (0)