Skip to content

Commit 742ce7f

Browse files
author
and.sergeev
committed
added benchmarks and reworked message commit ot store offset also fixed numerious bugs
1 parent 7ac3083 commit 742ce7f

File tree

10 files changed

+691
-61
lines changed

10 files changed

+691
-61
lines changed

Makefile

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@ docker-run-zoo: docker-remove-zoo
2020
docker-remove-kafka:
2121
docker rm -f kafka || true
2222

23+
docker-pull-kafka:
24+
docker pull wurstmeister/kafka
25+
2326
docker-run-kafka: docker-remove-kafka
2427
docker run -d \
2528
--net=${NETWORK} \
2629
--name=kafka \
2730
-p 9092:9092 \
2831
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
32+
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
2933
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
3034
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
31-
confluentinc/cp-kafka:5.0.0
35+
wurstmeister/kafka
3236

3337
docker-create-test-topic:
3438
docker run \
@@ -81,11 +85,15 @@ docker-remove-all: \
8185
docker-remove-zoo \
8286
docker-remove-network
8387

84-
docker-run-all: \
88+
docker-run-environment: \
8589
docker-remove-all \
8690
docker-create-network \
8791
docker-run-zoo \
88-
docker-run-kafka \
92+
docker-run-kafka
93+
94+
docker-run-all: \
95+
docker-run-environment \
96+
docker-create-network \
8997
docker-build-app \
9098
docker-run-app
9199

@@ -130,3 +138,102 @@ test-run-with-docker: tests-dep docker-run-all
130138
. venv/bin/activate && \
131139
pytest -vv && \
132140
deactivate
141+
142+
#######################################################################
143+
# Benchmarks
144+
145+
docker-create-benchmark-async-producer-topic:
146+
docker run \
147+
--net=${NETWORK} \
148+
--rm confluentinc/cp-kafka:5.0.0 \
149+
kafka-topics --create --topic async_producer_benchmark --partitions 2 --replication-factor 1 \
150+
--if-not-exists --zookeeper zookeeper:2181
151+
152+
docker-run-benchmark-async-producer-interactive: docker-build-app docker-remove-app
153+
docker run -it \
154+
-p 3301:3301 \
155+
--net ${NETWORK} \
156+
--name ${APP_NAME} \
157+
--entrypoint "tarantool" \
158+
-e KAFKA_BROKERS=kafka:9092 \
159+
${APP_IMAGE} \
160+
/opt/tarantool/benchmarks/async_producer.lua
161+
162+
docker-read-benchmark-async-producer-topic-data:
163+
docker run \
164+
--net=${NETWORK} \
165+
--rm \
166+
confluentinc/cp-kafka:5.0.0 \
167+
kafka-console-consumer --bootstrap-server kafka:9092 --topic async_producer_benchmark --from-beginning
168+
169+
docker-create-benchmark-sync-producer-topic:
170+
docker run \
171+
--net=${NETWORK} \
172+
--rm confluentinc/cp-kafka:5.0.0 \
173+
kafka-topics --create --topic sync_producer_benchmark --partitions 2 --replication-factor 1 \
174+
--if-not-exists --zookeeper zookeeper:2181
175+
176+
docker-run-benchmark-sync-producer-interactive: docker-build-app docker-remove-app
177+
docker run -it \
178+
-p 3301:3301 \
179+
--net ${NETWORK} \
180+
--name ${APP_NAME} \
181+
--entrypoint "tarantool" \
182+
-e KAFKA_BROKERS=kafka:9092 \
183+
${APP_IMAGE} \
184+
/opt/tarantool/benchmarks/sync_producer.lua
185+
186+
docker-read-benchmark-sync-producer-topic-data:
187+
docker run \
188+
--net=${NETWORK} \
189+
--rm \
190+
confluentinc/cp-kafka:5.0.0 \
191+
kafka-console-consumer --bootstrap-server kafka:9092 --topic sync_producer_benchmark --from-beginning
192+
193+
docker-create-benchmark-auto-offset-store-consumer-topic:
194+
docker run \
195+
--net=${NETWORK} \
196+
--rm confluentinc/cp-kafka:5.0.0 \
197+
kafka-topics --create --topic auto_offset_store_consumer_benchmark --partitions 2 --replication-factor 1 \
198+
--if-not-exists --zookeeper zookeeper:2181
199+
200+
docker-run-benchmark-auto-offset-store-consumer-interactive: docker-build-app docker-remove-app
201+
docker run -it \
202+
-p 3301:3301 \
203+
--net ${NETWORK} \
204+
--name ${APP_NAME} \
205+
--entrypoint "tarantool" \
206+
-e KAFKA_BROKERS=kafka:9092 \
207+
${APP_IMAGE} \
208+
/opt/tarantool/benchmarks/auto_offset_store_consumer.lua
209+
210+
docker-read-benchmark-auto-offset-store-consumer-topic-data:
211+
docker run \
212+
--net=${NETWORK} \
213+
--rm \
214+
confluentinc/cp-kafka:5.0.0 \
215+
kafka-console-consumer --bootstrap-server kafka:9092 --topic auto_offset_store_consumer_benchmark --from-beginning
216+
217+
docker-create-benchmark-manual-commit-consumer-topic:
218+
docker run \
219+
--net=${NETWORK} \
220+
--rm confluentinc/cp-kafka:5.0.0 \
221+
kafka-topics --create --topic manual_offset_store_consumer --partitions 2 --replication-factor 1 \
222+
--if-not-exists --zookeeper zookeeper:2181
223+
224+
docker-run-benchmark-manual-commit-consumer-interactive: docker-build-app docker-remove-app
225+
docker run -it \
226+
-p 3301:3301 \
227+
--net ${NETWORK} \
228+
--name ${APP_NAME} \
229+
--entrypoint "tarantool" \
230+
-e KAFKA_BROKERS=kafka:9092 \
231+
${APP_IMAGE} \
232+
/opt/tarantool/benchmarks/manual_offset_store_consumer.lua
233+
234+
docker-read-benchmark-manual-commit-consumer-topic-data:
235+
docker run \
236+
--net=${NETWORK} \
237+
--rm \
238+
confluentinc/cp-kafka:5.0.0 \
239+
kafka-console-consumer --bootstrap-server kafka:9092 --topic manual_offset_store_consumer --from-beginning

README.md

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
tnt-kafka
22
=========
3-
Full featured kafka library for Tarantool based on [rdkafka](https://github.com/edenhill/librdkafka).
3+
Full featured high performance kafka library for Tarantool based on [librdkafka](https://github.com/edenhill/librdkafka).
4+
5+
Can produce more then 80k messages per second and consume more then 130k messages per second.
6+
7+
Library was tested with librdkafka v0.11.5
48

59
# Features
610
* Kafka producer and consumer implementations.
@@ -9,24 +13,11 @@ Full featured kafka library for Tarantool based on [rdkafka](https://github.com/
913
some libraries throws lua native `error` while others throws `box.error` instead. `tnt-kafka` returns
1014
errors as strings which allows you to decide how to handle it.
1115

12-
# Known issues
13-
* Producer can use only random messages partitioning. It was done intentionally because non nil key
14-
leads to segfault.
15-
* Consumer leaves some non gc'able objects in memory after has been stopped. It was done intentionally
16-
because `rd_kafka_destroy` sometimes hangs forever.
17-
18-
# TODO
19-
* Benchmarks
20-
* Rocks package
21-
* Fix known issues
22-
* More examples
23-
* Better documentation
24-
2516
# Examples
2617

2718
## Consumer
2819

29-
### With enabled auto commit
20+
### With auto offset store
3021
```lua
3122
local fiber = require('fiber')
3223
local os = require('os')
@@ -94,7 +85,7 @@ because `rd_kafka_destroy` sometimes hangs forever.
9485
end
9586
```
9687

97-
### With multiple fibers and manual commit
88+
### With multiple fibers and manual offset store
9889
```lua
9990
local fiber = require('fiber')
10091
local os = require('os')
@@ -150,7 +141,7 @@ because `rd_kafka_destroy` sometimes hangs forever.
150141
msg:topic(), msg:partition(), msg:offset(), msg:value()
151142
))
152143

153-
local err = consumer:commit_async(msg) -- don't forget to commit processed messages
144+
local err = consumer:store_offset(msg) -- don't forget to commit processed messages
154145
if err ~= nil then
155146
print(string.format(
156147
"got error '%s' while commiting msg from topic '%s'",
@@ -287,6 +278,69 @@ because `rd_kafka_destroy` sometimes hangs forever.
287278
end
288279
```
289280

281+
# Known issues
282+
* Producer can use only random messages partitioning. It was done intentionally because non nil key
283+
leads to segfault.
284+
* Consumer leaves some non gc'able objects in memory after has been stopped. It was done intentionally
285+
because `rd_kafka_destroy` sometimes hangs forever.
286+
287+
# TODO
288+
* Rocks package
289+
* Ordered storage for offsets to prevent commits unprocessed messages
290+
* Fix known issues
291+
* More examples
292+
* Better documentation
293+
294+
# Benchmarks
295+
296+
## Producer
297+
298+
### Async
299+
300+
Result: over 80000 produced messages per second on macbook pro 2016
301+
302+
Local run in docker:
303+
```bash
304+
make docker-run-environment
305+
make docker-create-benchmark-async-producer-topic
306+
make docker-run-benchmark-async-producer-interactive
307+
```
308+
309+
### Sync
310+
311+
Result: over 50000 produced messages per second on macbook pro 2016
312+
313+
Local run in docker:
314+
```bash
315+
make docker-run-environment
316+
make docker-create-benchmark-sync-producer-topic
317+
make docker-run-benchmark-sync-producer-interactive
318+
```
319+
320+
## Consumer
321+
322+
### Auto offset store enabled
323+
324+
Result: over 130000 consumed messages per second on macbook pro 2016
325+
326+
Local run in docker:
327+
```bash
328+
make docker-run-environment
329+
make docker-create-benchmark-auto-offset-store-consumer-topic
330+
make docker-run-benchmark-auto-offset-store-consumer-interactive
331+
```
332+
333+
### Manual offset store
334+
335+
Result: over 130000 consumed messages per second on macbook pro 2016
336+
337+
Local run in docker:
338+
```bash
339+
make docker-run-environment
340+
docker-create-benchmark-manual-commit-consumer-topic
341+
make docker-run-benchmark-manual-commit-consumer-interactive
342+
```
343+
290344
# Developing
291345

292346
## Tests

benchmarks/async_producer.lua

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
local fiber = require('fiber')
2+
local box = require('box')
3+
local os = require('os')
4+
local log = require('log')
5+
local clock = require('clock')
6+
local kafka_producer = require('tnt-kafka.producer')
7+
8+
box.cfg{}
9+
10+
box.once('init', function()
11+
box.schema.user.grant("guest", 'read,write,execute,create,drop', 'universe')
12+
end)
13+
14+
local function produce()
15+
local config, err = kafka_producer.ProducerConfig.create(
16+
{"kafka:9092"}, -- -- array of brokers
17+
false -- sync_producer
18+
)
19+
if err ~= nil then
20+
print(err)
21+
os.exit(1)
22+
end
23+
24+
local producer, err = kafka_producer.Producer.create(config)
25+
if err ~= nil then
26+
print(err)
27+
os.exit(1)
28+
end
29+
30+
local err = producer:start()
31+
if err ~= nil then
32+
print(err)
33+
os.exit(1)
34+
end
35+
36+
local err = producer:add_topic("async_producer_benchmark", {}) -- add topic with configuration
37+
if err ~= nil then
38+
print(err)
39+
os.exit(1)
40+
end
41+
42+
local before = clock.monotonic64()
43+
for i = 1, 10000000 do
44+
while true do
45+
local err = producer:produce_async({ -- don't wait until message will be delivired to kafka
46+
topic = "async_producer_benchmark",
47+
value = "test_value_" .. tostring(i) -- only strings allowed
48+
})
49+
if err ~= nil then
50+
print(err)
51+
else
52+
break
53+
end
54+
end
55+
if i % 1000 == 0 then
56+
fiber.yield()
57+
end
58+
end
59+
60+
log.info("stopping")
61+
local err = producer:stop() -- always stop consumer to send all pending messages before app close
62+
if err ~= nil then
63+
print(err)
64+
os.exit(1)
65+
end
66+
67+
local duration = clock.monotonic64() - before
68+
print(string.format("done benchmark for %f seconds", tonumber(duration * 1.0 / (10 ^ 9))))
69+
end
70+
71+
log.info("starting benchmark")
72+
produce()

0 commit comments

Comments
 (0)