Skip to content

Commit 84b970f

Browse files
author
and.sergeev
committed
implemented new producer
1 parent 5ed0494 commit 84b970f

File tree

5 files changed

+353
-59
lines changed

5 files changed

+353
-59
lines changed

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ ADD . /opt/tarantool
66

77
WORKDIR /opt/tarantool
88

9-
ENTRYPOINT tarantool /opt/tarantool/tests/producer.lua
9+
ENTRYPOINT tarantool /opt/tarantool/tests/new_producer.lua

kafka/kafka.lua

Lines changed: 0 additions & 51 deletions
This file was deleted.

kafka/librdkafka.lua

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,28 @@ ffi.cdef[[
66
typedef struct rd_kafka_topic_s rd_kafka_topic_t;
77
typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
88

9-
typedef enum rd_kafka_type_t {
10-
RD_KAFKA_PRODUCER,
11-
RD_KAFKA_CONSUMER
12-
} rd_kafka_type_t;
13-
149
typedef enum {
1510
RD_KAFKA_RESP_ERR__BEGIN = -200,
1611
RD_KAFKA_RESP_ERR_NO_ERROR = 0,
1712
/* ... */
1813
} rd_kafka_resp_err_t;
1914

15+
typedef struct rd_kafka_message_s {
16+
rd_kafka_resp_err_t err;
17+
rd_kafka_topic_t *rkt;
18+
int32_t partition;
19+
void *payload;
20+
size_t len;
21+
void *key;
22+
size_t key_len;
23+
int64_t offset;
24+
} rd_kafka_message_t;
25+
26+
typedef enum rd_kafka_type_t {
27+
RD_KAFKA_PRODUCER,
28+
RD_KAFKA_CONSUMER
29+
} rd_kafka_type_t;
30+
2031
typedef enum {
2132
RD_KAFKA_CONF_UNKNOWN = -2, /* Unknown configuration name. */
2233
RD_KAFKA_CONF_INVALID = -1, /* Invalid configuration value. */
@@ -30,8 +41,8 @@ ffi.cdef[[
3041
void rd_kafka_conf_dump_free (const char **arr, size_t cnt);
3142
rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, const char *name, const char *value,
3243
char *errstr, size_t errstr_size);
33-
void rd_kafka_conf_set_dr_cb (rd_kafka_conf_t *conf, void (*dr_cb) (rd_kafka_t *rk,
34-
void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque));
44+
void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf, void (*dr_msg_cb) (rd_kafka_t *rk,
45+
const rd_kafka_message_t *rkmessage, void *opaque));
3546
void rd_kafka_conf_set_error_cb (rd_kafka_conf_t *conf, void (*error_cb) (rd_kafka_t *rk, int err,
3647
const char *reason, void *opaque));
3748
void rd_kafka_conf_set_stats_cb (rd_kafka_conf_t *conf, int (*stats_cb) (rd_kafka_t *rk, char *json,
@@ -65,7 +76,10 @@ ffi.cdef[[
6576
rd_kafka_resp_err_t rd_kafka_errno2err (int errnox);
6677
const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
6778
int rd_kafka_thread_cnt (void);
79+
80+
rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
6881
]]
6982

7083
local librdkafka = ffi.load("librdkafka.so.1")
7184
return librdkafka
85+

0 commit comments

Comments
 (0)