Skip to content

Commit 5ed0494

Browse files
author
and.sergeev
committed
added code from luardkafka project and docker test stand
1 parent e7928f0 commit 5ed0494

File tree

10 files changed

+679
-0
lines changed

10 files changed

+679
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea

Makefile

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
NETWORK="tnt-kafka-tests"
2+
3+
docker-remove-network:
4+
docker network remove ${NETWORK} || true
5+
6+
docker-create-network: docker-remove-network
7+
docker network create ${NETWORK}
8+
9+
docker-remove-zoo:
10+
docker rm -f zookeeper || true
11+
12+
docker-run-zoo: docker-remove-zoo
13+
docker run -d \
14+
--net=${NETWORK} \
15+
--name=zookeeper \
16+
-p 2181:2181 \
17+
-e ZOOKEEPER_CLIENT_PORT=2181 \
18+
confluentinc/cp-zookeeper:5.0.0
19+
20+
docker-remove-kafka:
21+
docker rm -f kafka || true
22+
23+
docker-run-kafka: docker-remove-kafka
24+
docker run -d \
25+
--net=${NETWORK} \
26+
--name=kafka \
27+
-p 9092:9092 \
28+
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
29+
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
30+
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
31+
confluentinc/cp-kafka:5.0.0
32+
33+
docker-create-test-topic:
34+
docker run \
35+
--net=${NETWORK} \
36+
--rm confluentinc/cp-kafka:5.0.0 \
37+
kafka-topics --create --topic test_producer --partitions 1 --replication-factor 1 \
38+
--if-not-exists --zookeeper zookeeper:2181
39+
40+
docker-read-topic-data:
41+
docker run \
42+
--net=${NETWORK} \
43+
--rm \
44+
confluentinc/cp-kafka:5.0.0 \
45+
kafka-console-consumer --bootstrap-server kafka:9092 --topic test_producer --from-beginning
46+
47+
APP_NAME = kafka-test
48+
APP_IMAGE = kafka-test-image
49+
50+
docker-build-app:
51+
docker build -t ${APP_IMAGE} -f ./docker/Dockerfile .
52+
53+
docker-remove-app:
54+
docker rm -f ${APP_NAME} || true
55+
56+
docker-run-app: docker-build-app docker-remove-app
57+
docker run -d \
58+
--net ${NETWORK} \
59+
--name ${APP_NAME} \
60+
-e KAFKA_BROKERS=kafka:9092 \
61+
${APP_IMAGE}
62+
63+
docker-run-interactive: docker-build-app docker-remove-app
64+
docker run -it \
65+
--net ${NETWORK} \
66+
--name ${APP_NAME} \
67+
-e KAFKA_BROKERS=kafka:9092 \
68+
${APP_IMAGE}
69+
70+
docker-remove-all: \
71+
docker-remove-app \
72+
docker-remove-kafka \
73+
docker-remove-zoo \
74+
docker-remove-network
75+
76+
docker-run-all: \
77+
docker-remove-all \
78+
docker-create-network \
79+
docker-run-zoo \
80+
docker-run-kafka \
81+
docker-build-app \
82+
docker-create-test-topic \
83+
docker-run-app

docker/Dockerfile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
FROM tarantool/tarantool:1.x-centos7
2+
3+
RUN yum install -y librdkafka
4+
5+
ADD . /opt/tarantool
6+
7+
WORKDIR /opt/tarantool
8+
9+
ENTRYPOINT tarantool /opt/tarantool/tests/producer.lua

kafka/config.lua

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
local librdkafka = require 'kafka.librdkafka'
2+
local ffi = require 'ffi'
3+
4+
local KafkaConfig = {}
5+
KafkaConfig.__index = KafkaConfig
6+
7+
--[[
8+
Create configuration object or dublicate one.
9+
Result will be set up the defaults.
10+
11+
Please see CONFIGURATION.md for the default settings.
12+
]]--
13+
14+
function KafkaConfig.create(original_config)
15+
local config = { cb_ = {} }
16+
setmetatable(config, KafkaConfig)
17+
18+
if original_config and original_config.kafka_conf_ then
19+
rawset(config, "kafka_conf_", librdkafka.rd_kafka_conf_dup(original_config.kafka_conf_))
20+
config:set_delivery_cb(original_config.cb_.dr_cb_)
21+
config:set_stat_cb(original_config.cb_.stat_cb_)
22+
config:set_error_cb(original_config.cb_.error_cb_)
23+
config:set_log_cb(original_config.cb_.log_cb_)
24+
else
25+
rawset(config, "kafka_conf_", librdkafka.rd_kafka_conf_new())
26+
end
27+
ffi.gc(config.kafka_conf_, function (config)
28+
librdkafka.rd_kafka_conf_destroy(config)
29+
end
30+
)
31+
32+
return config
33+
end
34+
35+
36+
--[[
37+
Dump the configuration properties and values of `conf` to a map
38+
with "key", "value" pairs.
39+
]]--
40+
41+
function KafkaConfig:dump()
42+
assert(self.kafka_conf_ ~= nil)
43+
44+
local size = ffi.new("size_t[1]")
45+
local dump = librdkafka.rd_kafka_conf_dump(self.kafka_conf_, size)
46+
ffi.gc(dump, function(d) librdkafka.rd_kafka_conf_dump_free(d, size[0]) end)
47+
48+
local result = {}
49+
for i = 0, tonumber(size[0])-1,2 do
50+
result[ffi.string(dump[i])] = ffi.string(dump[i+1])
51+
end
52+
53+
return result
54+
end
55+
56+
57+
--[[
58+
Sets a configuration property.
59+
In case of failure "error(errstr)" is called and 'errstr'
60+
is updated to contain a human readable error string.
61+
]]--
62+
63+
function KafkaConfig:__newindex(name, value)
64+
assert(self.kafka_conf_ ~= nil)
65+
66+
local ERRLEN = 256
67+
local errbuf = ffi.new("char[?]", ERRLEN) -- cdata objects are garbage collected
68+
69+
if librdkafka.rd_kafka_conf_set(self.kafka_conf_, name, tostring(value), errbuf, ERRLEN) ~= librdkafka.RD_KAFKA_CONF_OK then
70+
error(ffi.string(errbuf))
71+
end
72+
end
73+
74+
75+
--[[
76+
Set delivery report callback in provided conf object.
77+
Format: callback_function(payload, errstr)
78+
'payload' is the message payload
79+
'errstr' nil if everything is ok or readable error description otherwise
80+
]]--
81+
82+
function KafkaConfig:set_delivery_cb(callback)
83+
assert(self.kafka_conf_ ~= nil)
84+
85+
if callback then
86+
self.cb_.dr_cb_ = callback
87+
librdkafka.rd_kafka_conf_set_dr_cb(self.kafka_conf_,
88+
function(rk, payload, len, err)
89+
local errstr = nil
90+
if err ~= librdkafka.RD_KAFKA_RESP_ERR_NO_ERROR then
91+
errstr = ffi.string(librdkafka.rd_kafka_err2str(err))
92+
end
93+
callback(ffi.string(payload, tonumber(len)), errstr)
94+
end)
95+
end
96+
end
97+
98+
99+
--[[
100+
Set statistics callback.
101+
The statistics callback is called from `KafkaProducer:poll` every
102+
`statistics.interval.ms` (needs to be configured separately).
103+
Format: callback_function(json)
104+
'json' - String containing the statistics data in JSON format
105+
]]--
106+
107+
function KafkaConfig:set_stat_cb(callback)
108+
assert(self.kafka_conf_ ~= nil)
109+
110+
if callback then
111+
self.cb_.stat_cb_ = callback
112+
librdkafka.rd_kafka_conf_set_stats_cb(self.kafka_conf_,
113+
function(rk, json, json_len)
114+
callback(ffi.string(json, json_len))
115+
return 0 --librdkafka will immediately free the 'json' pointer.
116+
end)
117+
end
118+
end
119+
120+
121+
--[[
122+
Set error callback.
123+
The error callback is used by librdkafka to signal critical errors
124+
back to the application.
125+
Format: callback_function(err_numb, reason)
126+
]]--
127+
128+
function KafkaConfig:set_error_cb(callback)
129+
assert(self.kafka_conf_ ~= nil)
130+
131+
if callback then
132+
self.cb_.error_cb_ = callback
133+
librdkafka.rd_kafka_conf_set_error_cb(self.kafka_conf_,
134+
function(rk, err, reason)
135+
callback(tonumber(err), ffi.string(reason))
136+
end)
137+
end
138+
end
139+
140+
--[[
141+
Set logger callback.
142+
The default is to print to stderr.
143+
Alternatively the application may provide its own logger callback.
144+
Or pass 'callback' as nil to disable logging.
145+
Format: callback_function(level, fac, buf)
146+
]]--
147+
148+
function KafkaConfig:set_log_cb(callback)
149+
assert(self.kafka_conf_ ~= nil)
150+
151+
if callback then
152+
self.cb_.log_cb_ = callback
153+
librdkafka.rd_kafka_conf_set_log_cb(self.kafka_conf_,
154+
function(rk, level, fac, buf)
155+
callback(tonumber(level), ffi.string(fac), ffi.string(buf))
156+
end)
157+
end
158+
end
159+
160+
return KafkaConfig

kafka/kafka.lua

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
local librdkafka = require 'kafka.librdkafka'
2+
local KafkaTopicConfig = require 'kafka.topic_config'
3+
local ffi = require 'ffi'
4+
5+
local KafkaTopic = { kafka_topic_map_ = {} }
6+
-- KafkaProducer will delete all topics on destroy
7+
-- It was done in order to avoid destroing topics before destroing producer
8+
9+
KafkaTopic.__index = KafkaTopic
10+
11+
--[[
12+
Creates a new topic handle for topic named 'topic_name'.
13+
'conf' is an optional configuration for the topic that will be used
14+
instead of the default topic configuration.
15+
The 'conf' object is reusable after this call.
16+
Returns the new topic handle or "error(errstr)" on error in which case
17+
'errstr' is set to a human readable error message.
18+
]]--
19+
20+
function KafkaTopic.create(kafka_producer, topic_name, topic_config)
21+
assert(kafka_producer.kafka_ ~= nil)
22+
23+
local config = nil
24+
if topic_config and topic_config.topic_config_ then
25+
config = KafkaTopicConfig.create(topic_config).topic_conf_
26+
ffi.gc(config, nil)
27+
end
28+
29+
local rd_topic = librdkafka.rd_kafka_topic_new(kafka_producer.kafka_, topic_name, config)
30+
31+
if rd_topic == nil then
32+
error(ffi.string(librdkafka.rd_kafka_err2str(librdkafka.rd_kafka_errno2err(ffi.errno()))))
33+
end
34+
35+
local topic = {topic_ = rd_topic}
36+
setmetatable(topic, KafkaTopic)
37+
table.insert(KafkaTopic.kafka_topic_map_[kafka_producer.kafka_], rd_topic)
38+
return topic
39+
end
40+
41+
42+
--[[
43+
Returns the topic name
44+
]]--
45+
46+
function KafkaTopic:name()
47+
assert(self.topic_ ~= nil)
48+
return ffi.string(librdkafka.rd_kafka_topic_name(self.topic_))
49+
end
50+
51+
return KafkaTopic

kafka/librdkafka.lua

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
local ffi = require 'ffi'
2+
3+
ffi.cdef[[
4+
typedef struct rd_kafka_s rd_kafka_t;
5+
typedef struct rd_kafka_conf_s rd_kafka_conf_t;
6+
typedef struct rd_kafka_topic_s rd_kafka_topic_t;
7+
typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
8+
9+
typedef enum rd_kafka_type_t {
10+
RD_KAFKA_PRODUCER,
11+
RD_KAFKA_CONSUMER
12+
} rd_kafka_type_t;
13+
14+
typedef enum {
15+
RD_KAFKA_RESP_ERR__BEGIN = -200,
16+
RD_KAFKA_RESP_ERR_NO_ERROR = 0,
17+
/* ... */
18+
} rd_kafka_resp_err_t;
19+
20+
typedef enum {
21+
RD_KAFKA_CONF_UNKNOWN = -2, /* Unknown configuration name. */
22+
RD_KAFKA_CONF_INVALID = -1, /* Invalid configuration value. */
23+
RD_KAFKA_CONF_OK = 0 /* Configuration okay */
24+
} rd_kafka_conf_res_t;
25+
26+
rd_kafka_conf_t *rd_kafka_conf_new (void);
27+
rd_kafka_conf_t *rd_kafka_conf_dup (const rd_kafka_conf_t *conf);
28+
void rd_kafka_conf_destroy (rd_kafka_conf_t *conf);
29+
const char **rd_kafka_conf_dump (rd_kafka_conf_t *conf, size_t *cntp);
30+
void rd_kafka_conf_dump_free (const char **arr, size_t cnt);
31+
rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, const char *name, const char *value,
32+
char *errstr, size_t errstr_size);
33+
void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf, void (*dr_cb) (rd_kafka_t *rk,
34+
void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque));
35+
void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf, void (*error_cb) (rd_kafka_t *rk, int err,
36+
const char *reason, void *opaque));
37+
void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf, int (*stats_cb) (rd_kafka_t *rk, char *json,
38+
size_t json_len, void *opaque));
39+
void rd_kafka_conf_set_log_cb (rd_kafka_conf_t *conf, void (*log_cb) (const rd_kafka_t *rk, int level,
40+
const char *fac, const char *buf));
41+
42+
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size);
43+
void rd_kafka_destroy (rd_kafka_t *rk);
44+
int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist);
45+
46+
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void);
47+
rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup (const rd_kafka_topic_conf_t *conf);
48+
rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf, const char *name,
49+
const char *value, char *errstr, size_t errstr_size);
50+
void rd_kafka_topic_conf_destroy (rd_kafka_topic_conf_t *topic_conf);
51+
const char **rd_kafka_topic_conf_dump (rd_kafka_topic_conf_t *conf, size_t *cntp);
52+
53+
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf);
54+
const char *rd_kafka_topic_name (const rd_kafka_topic_t *rkt);
55+
void rd_kafka_topic_destroy (rd_kafka_topic_t *rkt);
56+
57+
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partitition, int msgflags, void *payload, size_t len,
58+
const void *key, size_t keylen, void *msg_opaque);
59+
60+
int rd_kafka_outq_len (rd_kafka_t *rk);
61+
int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms);
62+
63+
int rd_kafka_wait_destroyed (int timeout_ms);
64+
65+
rd_kafka_resp_err_t rd_kafka_errno2err (int errnox);
66+
const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
67+
int rd_kafka_thread_cnt (void);
68+
]]
69+
70+
local librdkafka = ffi.load("librdkafka.so.1")
71+
return librdkafka

0 commit comments

Comments
 (0)