Skip to content

Commit 50a4d1d

Browse files
tchronoedsiper
authored andcommitted
out_kafka: refactor some configuration parsing into a separate module
Add the flb_kafka module, which will contain common kafka code. For now only some configuration parsing was extracted from out_kafka. Signed-off-by: Thiago Padilha <[email protected]>
1 parent aa355d2 commit 50a4d1d

File tree

7 files changed

+169
-69
lines changed

7 files changed

+169
-69
lines changed

include/fluent-bit/flb_kafka.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2021 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#ifndef FLB_KAFKA_H
22+
#define FLB_KAFKA_H
23+
24+
#include <monkey/mk_core.h>
25+
#include <fluent-bit/flb_info.h>
26+
#include <fluent-bit/flb_config.h>
27+
28+
#include <rdkafka.h>
29+
30+
#define FLB_KAFKA_BROKERS "127.0.0.1"
31+
#define FLB_KAFKA_TOPIC "fluent-bit"
32+
33+
struct flb_kafka {
34+
rd_kafka_t *rk;
35+
char *brokers;
36+
};
37+
38+
rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka,
39+
struct mk_list *properties,
40+
int with_group_id);
41+
42+
#endif

plugins/out_kafka/kafka.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ static int cb_kafka_init(struct flb_output_instance *ins,
7373
struct flb_out_kafka *ctx;
7474

7575
/* Configuration */
76-
ctx = flb_kafka_conf_create(ins, config);
76+
ctx = flb_out_kafka_create(ins, config);
7777
if (!ctx) {
7878
flb_plg_error(ins, "failed to initialize");
7979
return -1;
@@ -420,7 +420,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
420420
* issue a full retry of the data chunk.
421421
*/
422422
flb_time_sleep(1000);
423-
rd_kafka_poll(ctx->producer, 0);
423+
rd_kafka_poll(ctx->kafka.rk, 0);
424424

425425
/* Issue a re-try */
426426
queue_full_retries++;
@@ -433,7 +433,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
433433
}
434434
ctx->blocked = FLB_FALSE;
435435

436-
rd_kafka_poll(ctx->producer, 0);
436+
rd_kafka_poll(ctx->kafka.rk, 0);
437437
if (ctx->format == FLB_KAFKA_FMT_JSON) {
438438
flb_sds_destroy(s);
439439
}
@@ -499,7 +499,7 @@ static int cb_kafka_exit(void *data, struct flb_config *config)
499499
{
500500
struct flb_out_kafka *ctx = data;
501501

502-
flb_kafka_conf_destroy(ctx);
502+
flb_out_kafka_destroy(ctx);
503503
return 0;
504504
}
505505

plugins/out_kafka/kafka_config.c

Lines changed: 12 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,15 @@
2727
#include "kafka_topic.h"
2828
#include "kafka_callbacks.h"
2929

30-
struct flb_out_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
31-
struct flb_config *config)
30+
struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
31+
struct flb_config *config)
3232
{
33-
int ret;
3433
const char *tmp;
3534
char errstr[512];
3635
struct mk_list *head;
3736
struct mk_list *topics;
3837
struct flb_split_entry *entry;
3938
struct flb_out_kafka *ctx;
40-
struct flb_kv *kv;
4139

4240
/* Configuration context */
4341
ctx = flb_calloc(1, sizeof(struct flb_out_kafka));
@@ -49,55 +47,13 @@ struct flb_out_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
4947
ctx->blocked = FLB_FALSE;
5048

5149
/* rdkafka config context */
52-
ctx->conf = rd_kafka_conf_new();
50+
ctx->conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 0);
5351
if (!ctx->conf) {
5452
flb_plg_error(ctx->ins, "error creating context");
5553
flb_free(ctx);
5654
return NULL;
5755
}
5856

59-
/* rdkafka configuration parameters */
60-
ret = rd_kafka_conf_set(ctx->conf, "client.id", "fluent-bit",
61-
errstr, sizeof(errstr));
62-
if (ret != RD_KAFKA_CONF_OK) {
63-
flb_plg_error(ctx->ins, "cannot configure client.id");
64-
}
65-
66-
/* Config: Brokers */
67-
tmp = flb_output_get_property("brokers", ins);
68-
if (tmp) {
69-
ret = rd_kafka_conf_set(ctx->conf,
70-
"bootstrap.servers",
71-
tmp,
72-
errstr, sizeof(errstr));
73-
if (ret != RD_KAFKA_CONF_OK) {
74-
flb_plg_error(ctx->ins, "config: %s", errstr);
75-
flb_free(ctx);
76-
return NULL;
77-
}
78-
ctx->brokers = flb_strdup(tmp);
79-
}
80-
else {
81-
flb_plg_error(ctx->ins, "config: no brokers defined");
82-
flb_free(ctx);
83-
return NULL;
84-
}
85-
86-
/* Iterate custom rdkafka properties */
87-
mk_list_foreach(head, &ins->properties) {
88-
kv = mk_list_entry(head, struct flb_kv, _head);
89-
if (strncasecmp(kv->key, "rdkafka.", 8) == 0 &&
90-
flb_sds_len(kv->key) > 8) {
91-
92-
ret = rd_kafka_conf_set(ctx->conf, kv->key + 8, kv->val,
93-
errstr, sizeof(errstr));
94-
if (ret != RD_KAFKA_CONF_OK) {
95-
flb_plg_error(ctx->ins, "cannot configure '%s' property",
96-
kv->key + 8);
97-
}
98-
}
99-
}
100-
10157
/* Set our global opaque data (plugin context*/
10258
rd_kafka_conf_set_opaque(ctx->conf, ctx);
10359

@@ -234,12 +190,12 @@ struct flb_out_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
234190
}
235191

236192
/* Kafka Producer */
237-
ctx->producer = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf,
193+
ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf,
238194
errstr, sizeof(errstr));
239-
if (!ctx->producer) {
195+
if (!ctx->kafka.rk) {
240196
flb_plg_error(ctx->ins, "failed to create producer: %s",
241197
errstr);
242-
flb_kafka_conf_destroy(ctx);
198+
flb_out_kafka_destroy(ctx);
243199
return NULL;
244200
}
245201

@@ -280,28 +236,28 @@ struct flb_out_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
280236
}
281237
}
282238

283-
flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->brokers, tmp);
239+
flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp);
284240
#ifdef FLB_HAVE_AVRO_ENCODER
285241
flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
286242
#endif
287243

288244
return ctx;
289245
}
290246

291-
int flb_kafka_conf_destroy(struct flb_out_kafka *ctx)
247+
int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
292248
{
293249
if (!ctx) {
294250
return 0;
295251
}
296252

297-
if (ctx->brokers) {
298-
flb_free(ctx->brokers);
253+
if (ctx->kafka.brokers) {
254+
flb_free(ctx->kafka.brokers);
299255
}
300256

301257
flb_kafka_topic_destroy_all(ctx);
302258

303-
if (ctx->producer) {
304-
rd_kafka_destroy(ctx->producer);
259+
if (ctx->kafka.rk) {
260+
rd_kafka_destroy(ctx->kafka.rk);
305261
}
306262

307263
if (ctx->topic_key) {

plugins/out_kafka/kafka_config.h

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,14 @@
2626
#include <fluent-bit/flb_avro.h>
2727
#endif
2828

29-
#include "rdkafka.h"
29+
#include <fluent-bit/flb_kafka.h>
3030

3131
#define FLB_KAFKA_FMT_JSON 0
3232
#define FLB_KAFKA_FMT_MSGP 1
3333
#define FLB_KAFKA_FMT_GELF 2
3434
#ifdef FLB_HAVE_AVRO_ENCODER
3535
#define FLB_KAFKA_FMT_AVRO 3
3636
#endif
37-
#define FLB_KAFKA_BROKERS "127.0.0.1"
38-
#define FLB_KAFKA_TOPIC "fluent-bit"
3937
#define FLB_KAFKA_TS_KEY "@timestamp"
4038
#define FLB_KAFKA_QUEUE_FULL_RETRIES 10
4139

@@ -61,9 +59,9 @@ struct flb_kafka_topic {
6159
};
6260

6361
struct flb_out_kafka {
62+
struct flb_kafka kafka;
6463
/* Config Parameters */
6564
int format;
66-
char *brokers;
6765

6866
/* Optional topic key for routing */
6967
int topic_key_len;
@@ -102,7 +100,6 @@ struct flb_out_kafka {
102100
int queue_full_retries;
103101

104102
/* Internal */
105-
rd_kafka_t *producer;
106103
rd_kafka_conf_t *conf;
107104

108105
/* Plugin instance */
@@ -122,8 +119,8 @@ struct flb_out_kafka {
122119

123120
};
124121

125-
struct flb_out_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
126-
struct flb_config *config);
127-
int flb_kafka_conf_destroy(struct flb_out_kafka *ctx);
122+
struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
123+
struct flb_config *config);
124+
int flb_out_kafka_destroy(struct flb_out_kafka *ctx);
128125

129126
#endif

plugins/out_kafka/kafka_topic.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct flb_kafka_topic *flb_kafka_topic_create(char *name,
3030
rd_kafka_topic_t *tp;
3131
struct flb_kafka_topic *topic;
3232

33-
tp = rd_kafka_topic_new(ctx->producer, name, NULL);
33+
tp = rd_kafka_topic_new(ctx->kafka.rk, name, NULL);
3434
if (!tp) {
3535
flb_plg_error(ctx->ins, "failed to create topic: %s",
3636
rd_kafka_err2str(rd_kafka_last_error()));

src/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ if(FLB_LUAJIT)
190190
)
191191
endif()
192192

193+
if(FLB_OUT_KAFKA)
194+
set(src
195+
${src}
196+
"flb_kafka.c"
197+
)
198+
endif()
199+
193200
# Link to libco
194201
set(extra_libs
195202
${extra_libs}

src/flb_kafka.c

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2021 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#include "fluent-bit/flb_config.h"
22+
#include "fluent-bit/flb_mem.h"
23+
#include "fluent-bit/flb_str.h"
24+
#include <fluent-bit/flb_kafka.h>
25+
#include <fluent-bit/flb_kv.h>
26+
27+
#include <rdkafka.h>
28+
29+
rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka,
30+
struct mk_list *properties,
31+
int with_group_id)
32+
{
33+
struct mk_list *head;
34+
struct flb_kv *kv;
35+
const char *conf;
36+
rd_kafka_conf_t *kafka_cfg;
37+
char errstr[512];
38+
39+
kafka_cfg = rd_kafka_conf_new();
40+
if (!kafka_cfg) {
41+
flb_error("[flb_kafka] Could not initialize kafka config object");
42+
goto err;
43+
}
44+
45+
conf = flb_config_prop_get("client_id", properties);
46+
if (!conf) {
47+
conf = "fluent-bit";
48+
}
49+
if (rd_kafka_conf_set(kafka_cfg, "client.id", conf,
50+
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
51+
flb_error("[flb_kafka] cannot configure client id: %s", errstr);
52+
}
53+
54+
if (with_group_id) {
55+
conf = flb_config_prop_get("group_id", properties);
56+
if (!conf) {
57+
conf = "fluent-bit";
58+
}
59+
if (rd_kafka_conf_set(kafka_cfg, "group.id", conf,
60+
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
61+
flb_error("[flb_kafka] cannot configure group id: %s", errstr);
62+
}
63+
}
64+
65+
conf = flb_config_prop_get("brokers", properties);
66+
if (conf) {
67+
if (rd_kafka_conf_set(kafka_cfg, "bootstrap.servers", conf,
68+
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
69+
flb_error("[flb_kafka] failed to configure brokers: %s", errstr);
70+
goto err;
71+
}
72+
kafka->brokers = flb_strdup(conf);
73+
}
74+
else {
75+
flb_error("config: no brokers defined");
76+
goto err;
77+
}
78+
79+
/* Iterate custom rdkafka properties */
80+
mk_list_foreach(head, properties) {
81+
kv = mk_list_entry(head, struct flb_kv, _head);
82+
if (strncasecmp(kv->key, "rdkafka.", 8) == 0 &&
83+
flb_sds_len(kv->key) > 8) {
84+
if (rd_kafka_conf_set(kafka_cfg, kv->key + 8, kv->val,
85+
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
86+
flb_error("[flb_kafka] cannot configure '%s' property", kv->key + 8);
87+
}
88+
}
89+
}
90+
91+
return kafka_cfg;
92+
93+
err:
94+
if (kafka_cfg) {
95+
flb_free(kafka_cfg);
96+
}
97+
return NULL;
98+
}

0 commit comments

Comments
 (0)