Skip to content

Commit a3df821

Browse files
author
and.sergeev
committed
renamed kafka module to tnt-kafka and formed README file
1 parent c619e41 commit a3df821

File tree

6 files changed

+300
-5
lines changed

6 files changed

+300
-5
lines changed

README.md

Lines changed: 296 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,296 @@
1-
# tnt-kafka
1+
tnt-kafka
2+
=========
3+
Full featured kafka library for Tarantool based on [rdkafka](https://github.com/edenhill/librdkafka).
4+
5+
# Features
6+
* Kafka producer and consumer implementations.
7+
* Fiber friendly.
8+
* Mostly errorless functions and methods. Error handling in Tarantool ecosystem is quite a mess,
9+
some libraries throws lua native `error` while others throws `box.error` instead. `tnt-kafka` returns
10+
errors as strings which allows you to decide how to handle it.
11+
12+
# Known issues
13+
* Producer can use only random messages partitioning. It was done intentionally because non nil key
14+
leads to segfault.
15+
* Consumer leaves some non gc'able objects in memory after has been stopped. It was done intentionally
16+
because `rd_kafka_destroy` sometimes hangs forever.
17+
18+
# TODO
19+
* Benchmarks
20+
* Rocks package
21+
* Fix known issues
22+
* More examples
23+
* Better documentation
24+
25+
# Examples
26+
27+
## Consumer
28+
29+
### With enabled auto commit
30+
```lua
31+
local fiber = require('fiber')
32+
local os = require('os')
33+
local kafka_consumer = require('tnt-kafka.consumer')
34+
35+
local config, err = kafka_consumer.ConsumerConfig.create(
36+
{"localhost:9092"}, -- array of brokers
37+
"test_consumer", -- consumer group
38+
true, -- enable_auto_commit
39+
{["auto.offset.reset"] = "earliest"} -- default configuration for topics
40+
)
41+
if err ~= nil then
42+
print(err)
43+
os.exit(1)
44+
end
45+
46+
config:set_option("queued.min.messages", "100000") -- set global consumer option
47+
48+
local consumer, err = kafka_consumer.Consumer.create(config)
49+
if err ~= nil then
50+
print(err)
51+
os.exit(1)
52+
end
53+
54+
local err = consumer:start()
55+
if err ~= nil then
56+
print(err)
57+
os.exit(1)
58+
end
59+
60+
local err = consumer:subscribe({"test_topic"}) -- array of topics to subscribe
61+
if err ~= nil then
62+
print(err)
63+
os.exit(1)
64+
end
65+
66+
fiber.create(function()
67+
local out, err = consumer:output()
68+
if err ~= nil then
69+
print(string.format("got fatal error '%s'", err))
70+
return
71+
end
72+
73+
while true do
74+
if out:is_closed() then
75+
return
76+
end
77+
78+
local msg = out:get()
79+
if msg ~= nil then
80+
print(string.format(
81+
"got msg with topic='%s' partition='%s' offset='%s' value='%s'",
82+
msg:topic(), msg:partition(), msg:offset(), msg:value()
83+
))
84+
end
85+
end
86+
end)
87+
88+
fiber.sleep(10)
89+
90+
local err = consumer:stop() -- always stop consumer to commit all pending offsets before app close
91+
if err ~= nil then
92+
print(err)
93+
os.exit(1)
94+
end
95+
```
96+
97+
### With multiple fibers and manual commit
98+
```lua
99+
local fiber = require('fiber')
100+
local os = require('os')
101+
local kafka_consumer = require('tnt-kafka.consumer')
102+
103+
local config, err = kafka_consumer.ConsumerConfig.create(
104+
{"localhost:9092"}, -- array of brokers
105+
"test_consumer", -- consumer group
106+
false, -- enable_auto_commit
107+
{["auto.offset.reset"] = "earliest"} -- default configuration for topics
108+
)
109+
if err ~= nil then
110+
print(err)
111+
os.exit(1)
112+
end
113+
114+
config:set_option("queued.min.messages", "100000") -- set global consumer option
115+
116+
local consumer, err = kafka_consumer.Consumer.create(config)
117+
if err ~= nil then
118+
print(err)
119+
os.exit(1)
120+
end
121+
122+
local err = consumer:start()
123+
if err ~= nil then
124+
print(err)
125+
os.exit(1)
126+
end
127+
128+
local err = consumer:subscribe({"test_topic"}) -- array of topics to subscribe
129+
if err ~= nil then
130+
print(err)
131+
os.exit(1)
132+
end
133+
134+
for i = 1, 10 do
135+
fiber.create(function()
136+
local out, err = consumer:output()
137+
if err ~= nil then
138+
print(string.format("got fatal error '%s'", err))
139+
return
140+
end
141+
while true do
142+
if out:is_closed() then
143+
return
144+
end
145+
146+
local msg = out:get()
147+
if msg ~= nil then
148+
print(string.format(
149+
"got msg with topic='%s' partition='%s' offset='%s' value='%s'",
150+
msg:topic(), msg:partition(), msg:offset(), msg:value()
151+
))
152+
153+
local err = consumer:commit_async(msg) -- don't forget to commit processed messages
154+
if err ~= nil then
155+
print(string.format(
156+
"got error '%s' while commiting msg from topic '%s'",
157+
err, msg:topic()
158+
))
159+
end
160+
end
161+
end
162+
end)
163+
end
164+
165+
fiber.sleep(10)
166+
167+
local err = consumer:stop() -- always stop consumer to commit all pending offsets before app close
168+
if err ~= nil then
169+
print(err)
170+
os.exit(1)
171+
end
172+
```
173+
174+
## Producer
175+
176+
### With single fiber and async producer
177+
178+
```lua
179+
local os = require('os')
180+
local kafka_producer = require('tnt-kafka.producer')
181+
182+
local config, err = kafka_producer.ProducerConfig.create(
183+
{"localhost:9092"}, -- -- array of brokers
184+
false -- sync_producer
185+
)
186+
if err ~= nil then
187+
print(err)
188+
os.exit(1)
189+
end
190+
191+
config:set_option("statistics.interval.ms", "1000") -- set global producer option
192+
config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) -- set callback for stats
193+
194+
local producer, err = kafka_producer.Producer.create(config)
195+
if err ~= nil then
196+
print(err)
197+
os.exit(1)
198+
end
199+
200+
local err = producer:start()
201+
if err ~= nil then
202+
print(err)
203+
os.exit(1)
204+
end
205+
206+
local err = producer:add_topic("test_topic", {}) -- add topic with configuration
207+
if err ~= nil then
208+
print(err)
209+
os.exit(1)
210+
end
211+
212+
for i = 1, 1000 do
213+
local err = producer:produce_async({ -- don't wait until message will be delivired to kafka
214+
topic = "test_topic",
215+
value = "test_value" -- only strings allowed
216+
})
217+
if err ~= nil then
218+
print(err)
219+
os.exit(1)
220+
end
221+
end
222+
223+
local err = producer:stop() -- always stop consumer to send all pending messages before app close
224+
if err ~= nil then
225+
print(err)
226+
os.exit(1)
227+
end
228+
```
229+
230+
### With multiple fibers and sync producer
231+
```lua
232+
local fiber = require('fiber')
233+
local os = require('os')
234+
local kafka_producer = require('tnt-kafka.producer')
235+
236+
local config, err = kafka_producer.ProducerConfig.create(
237+
{"localhost:9092"}, -- -- array of brokers
238+
true -- sync_producer
239+
)
240+
if err ~= nil then
241+
print(err)
242+
os.exit(1)
243+
end
244+
245+
config:set_option("statistics.interval.ms", "1000") -- set global producer option
246+
config:set_stat_cb(function (payload) print("Stat Callback '".. payload.. "'") end) -- set callback for stats
247+
248+
local producer, err = kafka_producer.Producer.create(config)
249+
if err ~= nil then
250+
print(err)
251+
os.exit(1)
252+
end
253+
254+
local err = producer:start()
255+
if err ~= nil then
256+
print(err)
257+
os.exit(1)
258+
end
259+
260+
local err = producer:add_topic("test_topic", {}) -- add topic with configuration
261+
if err ~= nil then
262+
print(err)
263+
os.exit(1)
264+
end
265+
266+
for i = 1, 1000 do
267+
fiber.create(function()
268+
local message = "test_value " .. tostring(i)
269+
local err = producer:produce({ -- wait until message will be delivired to kafka (using channel under the hood)
270+
topic = "test_topic",
271+
value = message -- only strings allowed
272+
})
273+
if err ~= nil then
274+
print(string.format("got error '%s' while sending value '%s'", err, message))
275+
else
276+
print(string.format("successfully sent value '%s'", message))
277+
end
278+
end)
279+
end
280+
281+
fiber.sleep(10)
282+
283+
local err = producer:stop() -- always stop consumer to send all pending messages before app close
284+
if err ~= nil then
285+
print(err)
286+
os.exit(1)
287+
end
288+
```
289+
290+
# Developing
291+
292+
## Tests
293+
You can run docker based integration tests via makefile target
294+
```bash
295+
make test-run-with-docker
296+
```

tests/consumer.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
local log = require("log")
22
local os = require("os")
33
local fiber = require('fiber')
4-
local kafka_consumer = require('kafka.consumer')
4+
local kafka_consumer = require('tnt-kafka.consumer')
55

66
local BROKERS_ADDRESS = { "kafka" }
77
local TOPIC_NAME = "test_consumer"

tests/producer.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
local os = require('os')
22
local fiber = require('fiber')
3-
local kafka_producer = require('kafka.producer')
3+
local kafka_producer = require('tnt-kafka.producer')
44

55
local BROKERS_ADDRESS = { "kafka" }
66
local TOPIC_NAME = "test_producer"

kafka/consumer.lua renamed to tnt-kafka/consumer.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
local log = require("log")
22
local ffi = require('ffi')
33
local fiber = require('fiber')
4-
local librdkafka = require('kafka.librdkafka')
4+
local librdkafka = require('tnt-kafka.librdkafka')
55

66
local ConsumerConfig = {}
77

File renamed without changes.

kafka/producer.lua renamed to tnt-kafka/producer.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
local ffi = require('ffi')
22
local log = require('log')
33
local fiber = require('fiber')
4-
local librdkafka = require('kafka.librdkafka')
4+
local librdkafka = require('tnt-kafka.librdkafka')
55

66
local ProducerConfig = {}
77

0 commit comments

Comments
 (0)