Skip to content

Commit c748103

Browse files
tchronoedsiper
authored andcommitted
in_kafka: initial implementation
Signed-off-by: Thiago Padilha <[email protected]>
1 parent 50a4d1d commit c748103

File tree

9 files changed

+412
-3
lines changed

9 files changed

+412
-3
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ option(FLB_IN_FORWARD "Enable Forward input plugin" Yes)
123123
option(FLB_IN_HEALTH "Enable Health input plugin" Yes)
124124
option(FLB_IN_HTTP "Enable HTTP input plugin" Yes)
125125
option(FLB_IN_MEM "Enable Memory input plugin" Yes)
126+
option(FLB_IN_KAFKA "Enable Kafka input plugin" No)
126127
option(FLB_IN_KMSG "Enable Kernel log input plugin" Yes)
127128
option(FLB_IN_LIB "Enable library mode input plugin" Yes)
128129
option(FLB_IN_RANDOM "Enable random input plugin" Yes)
@@ -749,7 +750,7 @@ if(FLB_BACKTRACE)
749750
include_directories("${CMAKE_CURRENT_BINARY_DIR}/backtrace-prefix/include/")
750751
endif()
751752

752-
if(FLB_OUT_KAFKA)
753+
if(FLB_IN_KAFKA OR FLB_OUT_KAFKA)
753754
FLB_OPTION(RDKAFKA_BUILD_STATIC On)
754755
FLB_OPTION(RDKAFKA_BUILD_EXAMPLES Off)
755756
FLB_OPTION(RDKAFKA_BUILD_TESTS Off)

cmake/headers.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ include_directories(
2525
${CMAKE_CURRENT_BINARY_DIR}/include
2626
)
2727

28-
if(FLB_OUT_KAFKA)
28+
if(FLB_IN_KAFKA OR FLB_OUT_KAFKA)
2929
include_directories(${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_RDKAFKA}/src/)
3030
endif()
3131

include/fluent-bit/flb_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ struct flb_kafka {
3838
rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka,
3939
struct mk_list *properties,
4040
int with_group_id);
41+
rd_kafka_topic_partition_list_t *flb_kafka_parse_topics(const char *topics_str);
4142

4243
#endif

plugins/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
172172
REGISTER_IN_PLUGIN("in_node_exporter_metrics")
173173
endif()
174174

175+
REGISTER_IN_PLUGIN("in_kafka")
175176
REGISTER_IN_PLUGIN("in_fluentbit_metrics")
176177
REGISTER_IN_PLUGIN("in_emitter")
177178
REGISTER_IN_PLUGIN("in_tail")

plugins/in_kafka/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
set(src
2+
in_kafka.c
3+
)
4+
5+
FLB_PLUGIN(in_kafka "${src}" "rdkafka")
6+
target_link_libraries(flb-plugin-in_kafka -lpthread)

plugins/in_kafka/in_kafka.c

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2021 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#include <fluent-bit/flb_input_plugin.h>
22+
#include <fluent-bit/flb_config.h>
23+
#include <fluent-bit/flb_pack.h>
24+
#include <fluent-bit/flb_engine.h>
25+
#include <fluent-bit/flb_time.h>
26+
#include <fluent-bit/flb_parser.h>
27+
#include <fluent-bit/flb_error.h>
28+
#include <fluent-bit/flb_utils.h>
29+
#include <fluent-bit/flb_input_thread.h>
30+
#include <mpack/mpack.h>
31+
#include <stddef.h>
32+
#include <stdio.h>
33+
34+
#include "fluent-bit/flb_input.h"
35+
#include "fluent-bit/flb_kafka.h"
36+
#include "in_kafka.h"
37+
#include "rdkafka.h"
38+
39+
static int try_json(mpack_writer_t *writer, rd_kafka_message_t *rkm)
40+
{
41+
int root_type;
42+
char *buf = NULL;
43+
size_t bufsize;
44+
int ret;
45+
46+
ret = flb_pack_json(rkm->payload, rkm->len, &buf, &bufsize, &root_type);
47+
if (ret) {
48+
if (buf) {
49+
flb_free(buf);
50+
}
51+
return ret;
52+
}
53+
mpack_write_object_bytes(writer, buf, bufsize);
54+
flb_free(buf);
55+
return 0;
56+
}
57+
58+
static void process_message(mpack_writer_t *writer,
59+
rd_kafka_message_t *rkm)
60+
{
61+
struct flb_time t;
62+
63+
mpack_write_tag(writer, mpack_tag_array(2));
64+
65+
flb_time_get(&t);
66+
flb_time_append_to_mpack(writer, &t, 0);
67+
68+
mpack_write_tag(writer, mpack_tag_map(6));
69+
70+
mpack_write_cstr(writer, "topic");
71+
if (rkm->rkt) {
72+
mpack_write_cstr(writer, rd_kafka_topic_name(rkm->rkt));
73+
} else {
74+
mpack_write_nil(writer);
75+
}
76+
77+
mpack_write_cstr(writer, "partition");
78+
mpack_write_i32(writer, rkm->partition);
79+
80+
mpack_write_cstr(writer, "offset");
81+
mpack_write_i64(writer, rkm->offset);
82+
83+
mpack_write_cstr(writer, "error");
84+
if (rkm->err) {
85+
mpack_write_cstr(writer, rd_kafka_message_errstr(rkm));
86+
} else {
87+
mpack_write_nil(writer);
88+
}
89+
90+
mpack_write_cstr(writer, "key");
91+
if (rkm->key) {
92+
mpack_write_str(writer, rkm->key, rkm->key_len);
93+
} else {
94+
mpack_write_nil(writer);
95+
}
96+
97+
mpack_write_cstr(writer, "payload");
98+
if (rkm->payload) {
99+
if (try_json(writer, rkm)) {
100+
mpack_write_str(writer, rkm->payload, rkm->len);
101+
}
102+
} else {
103+
mpack_write_nil(writer);
104+
}
105+
106+
mpack_writer_flush_message(writer);
107+
}
108+
109+
static void in_kafka_callback(int write_fd, void *data)
110+
{
111+
struct flb_input_thread *it = data;
112+
struct flb_in_kafka_config *ctx = data - offsetof(struct flb_in_kafka_config, it);
113+
mpack_writer_t *writer = &ctx->it.writer;
114+
115+
while (!flb_input_thread_exited(it)) {
116+
rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 500);
117+
118+
if (rkm) {
119+
process_message(writer, rkm);
120+
fflush(ctx->it.write_file);
121+
rd_kafka_message_destroy(rkm);
122+
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
123+
}
124+
}
125+
}
126+
127+
/* Initialize plugin */
128+
static int in_kafka_init(struct flb_input_instance *ins,
129+
struct flb_config *config, void *data)
130+
{
131+
int ret;
132+
const char *conf;
133+
struct flb_in_kafka_config *ctx;
134+
rd_kafka_conf_t *kafka_conf = NULL;
135+
rd_kafka_topic_partition_list_t *kafka_topics = NULL;
136+
rd_kafka_resp_err_t err;
137+
char errstr[512];
138+
(void) data;
139+
140+
/* Allocate space for the configuration context */
141+
ctx = flb_malloc(sizeof(struct flb_in_kafka_config));
142+
if (!ctx) {
143+
return -1;
144+
}
145+
kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
146+
if (!kafka_conf) {
147+
flb_plg_error(ins, "Could not initialize kafka config object");
148+
goto init_error;
149+
}
150+
151+
ctx->kafka.rk = rd_kafka_new(RD_KAFKA_CONSUMER, kafka_conf, errstr,
152+
sizeof(errstr));
153+
154+
/* Create Kafka consumer handle */
155+
if (!ctx->kafka.rk) {
156+
flb_plg_error(ins, "Failed to create new consumer: %s", errstr);
157+
goto init_error;
158+
}
159+
160+
conf = flb_input_get_property("topics", ins);
161+
if (!conf) {
162+
flb_plg_error(ins, "config: no topics specified");
163+
goto init_error;
164+
}
165+
166+
kafka_topics = flb_kafka_parse_topics(conf);
167+
if (!kafka_topics) {
168+
flb_plg_error(ins, "Failed to parse topic list");
169+
goto init_error;
170+
}
171+
172+
if ((err = rd_kafka_subscribe(ctx->kafka.rk, kafka_topics))) {
173+
flb_plg_error(ins, "Failed to start consuming topics: %s", rd_kafka_err2str(err));
174+
goto init_error;
175+
}
176+
rd_kafka_topic_partition_list_destroy(kafka_topics);
177+
kafka_topics = NULL;
178+
179+
/* create worker thread */
180+
ret = flb_input_thread_init(&ctx->it, in_kafka_callback, &ctx->it);
181+
if (ret) {
182+
flb_errno();
183+
flb_plg_error(ins, "Could not initialize worker thread");
184+
goto init_error;
185+
}
186+
187+
/* Set the context */
188+
flb_input_set_context(ins, &ctx->it);
189+
190+
/* Collect upon data available on the pipe read fd */
191+
ret = flb_input_set_collector_event(ins,
192+
flb_input_thread_collect,
193+
ctx->it.read,
194+
config);
195+
if (ret == -1) {
196+
flb_plg_error(ins, "Could not set collector for thread dummy input plugin");
197+
goto init_error;
198+
}
199+
ctx->it.coll_fd = ret;
200+
201+
return 0;
202+
203+
init_error:
204+
if (kafka_topics) {
205+
rd_kafka_topic_partition_list_destroy(kafka_topics);
206+
}
207+
if (ctx->kafka.rk) {
208+
rd_kafka_destroy(ctx->kafka.rk);
209+
} else if (kafka_conf) {
210+
// conf is already destroyed when rd_kafka is initialized
211+
rd_kafka_conf_destroy(kafka_conf);
212+
}
213+
flb_free(ctx);
214+
215+
return -1;
216+
}
217+
218+
/* Cleanup serial input */
219+
static int in_kafka_exit(void *in_context, struct flb_config *config)
220+
{
221+
struct flb_input_thread *it;
222+
struct flb_in_kafka_config *ctx;
223+
224+
if (!in_context) {
225+
return 0;
226+
}
227+
228+
it = in_context;
229+
ctx = (in_context - offsetof(struct flb_in_kafka_config, it));
230+
flb_input_thread_destroy(it, ctx->ins);
231+
rd_kafka_destroy(ctx->kafka.rk);
232+
flb_free(ctx->kafka.brokers);
233+
flb_free(ctx);
234+
235+
return 0;
236+
}
237+
238+
/* Plugin reference */
239+
struct flb_input_plugin in_kafka_plugin = {
240+
.name = "kafka",
241+
.description = "Kafka consumer input plugin",
242+
.cb_init = in_kafka_init,
243+
.cb_pre_run = NULL,
244+
.cb_collect = flb_input_thread_collect,
245+
.cb_flush_buf = NULL,
246+
.cb_exit = in_kafka_exit
247+
};

plugins/in_kafka/in_kafka.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2021 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#ifndef FLB_IN_KAFKA_H
22+
#define FLB_IN_KAFKA_H
23+
24+
#include <fluent-bit/flb_config.h>
25+
#include <fluent-bit/flb_input.h>
26+
#include <fluent-bit/flb_input_thread.h>
27+
#include <fluent-bit/flb_kafka.h>
28+
29+
struct flb_in_kafka_config {
30+
struct flb_kafka kafka;
31+
struct flb_input_instance *ins;
32+
struct flb_input_thread it;
33+
};
34+
35+
#endif

src/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ if(FLB_LUAJIT)
190190
)
191191
endif()
192192

193-
if(FLB_OUT_KAFKA)
193+
if(FLB_IN_KAFKA OR FLB_OUT_KAFKA)
194194
set(src
195195
${src}
196196
"flb_kafka.c"

0 commit comments

Comments
 (0)