Skip to content

Commit 96af16c

Browse files
committed
out_kafka: Add dynamic/static headers support
Signed-off-by: difrin <[email protected]>
1 parent da8aea7 commit 96af16c

File tree

2 files changed

+117
-8
lines changed

2 files changed

+117
-8
lines changed

plugins/out_kafka/kafka.c

Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "kafka_config.h"
2727
#include "kafka_topic.h"
2828

29+
2930
void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
3031
void *opaque)
3132
{
@@ -73,6 +74,8 @@ static int cb_kafka_init(struct flb_output_instance *ins,
7374
{
7475
struct flb_out_kafka *ctx;
7576

77+
flb_plg_info(ins, "Starting kafka output init");
78+
7679
/* Configuration */
7780
ctx = flb_out_kafka_create(ins, config);
7881
if (!ctx) {
@@ -85,6 +88,26 @@ static int cb_kafka_init(struct flb_output_instance *ins,
8588
return 0;
8689
}
8790

91+
int flb_msgpack_get_map_value(struct flb_out_kafka *ctx, msgpack_object *map, const char *key, msgpack_object **val)
92+
{
93+
if (map->type != MSGPACK_OBJECT_MAP) {
94+
flb_error("[flb_msgpack_get_map_value] Map expected");
95+
return -1;
96+
}
97+
98+
size_t i;
99+
for (i = 0; i < map->via.map.size; ++i) {
100+
if (map->via.map.ptr[i].key.type == MSGPACK_OBJECT_STR &&
101+
strncmp(map->via.map.ptr[i].key.via.str.ptr, key, map->via.map.ptr[i].key.via.str.size) == 0) {
102+
*val = &map->via.map.ptr[i].val;
103+
flb_debug("key matches a field in the message");
104+
return 0;
105+
}
106+
}
107+
108+
return -1; // Key not found
109+
}
110+
88111
int produce_message(struct flb_time *tm, msgpack_object *map,
89112
struct flb_out_kafka *ctx, struct flb_config *config)
90113
{
@@ -106,6 +129,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
106129
msgpack_object key;
107130
msgpack_object val;
108131
flb_sds_t s;
132+
rd_kafka_headers_t *kafka_headers = NULL;
109133

110134
#ifdef FLB_HAVE_AVRO_ENCODER
111135
// used to flag when a buffer needs to be freed for avro
@@ -155,6 +179,70 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
155179
msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
156180
msgpack_pack_str_body(&mp_pck,
157181
ctx->timestamp_key, ctx->timestamp_key_len);
182+
183+
/* Check if headers are provided in the configuration */
184+
if (ctx->headers) {
185+
186+
flb_debug("setting message headers");
187+
/* Setting headers list size */
188+
int size_headers = 0;
189+
struct mk_list *tmp;
190+
struct mk_list *head2;
191+
struct flb_config_map_val *mv;
192+
struct flb_slist_entry *hkey = NULL;
193+
struct flb_slist_entry *hval = NULL;
194+
195+
/* Calculate the number of headers */
196+
mk_list_foreach_safe(head2, tmp, ctx->headers) {
197+
size_headers++;
198+
}
199+
200+
/* Create Kafka headers object */
201+
kafka_headers = rd_kafka_headers_new(size_headers);
202+
203+
/* Add headers from configuration */
204+
flb_config_map_foreach(head2, mv, ctx->headers) {
205+
hkey = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
206+
hval = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
207+
208+
flb_debug("found header %s with value %s", hkey->str, hval->str);
209+
210+
/* Extract the message field value */
211+
char *field_name = NULL;
212+
size_t field_len = flb_sds_len(hval->str);
213+
field_name = flb_malloc(field_len); // Allocate memory for field name
214+
if (!field_name) {
215+
flb_errno();
216+
return -1;
217+
}
218+
219+
memcpy(field_name, hval->str, field_len); // Copy field name
220+
/* Check if the header value is a message field */
221+
if (field_name[0] == '<' ) {
222+
flb_debug("header %s is part of the msg, field name : %s", hkey->str, hval->str);
223+
msgpack_object *field_value = NULL;
224+
if (flb_msgpack_get_map_value(ctx, map, field_name + 1, &field_value) == 0 &&
225+
field_value->type == MSGPACK_OBJECT_STR) {
226+
rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str),
227+
field_value->via.str.ptr, field_value->via.str.size);
228+
}
229+
else {
230+
flb_warn("Field '%s' not found or not a string value", field_name);
231+
}
232+
233+
flb_free(field_name); // Free allocated memory
234+
}
235+
else {
236+
/* Static header value */
237+
rd_kafka_header_add(kafka_headers, hkey->str, flb_sds_len(hkey->str),
238+
hval->str, flb_sds_len(hval->str));
239+
}
240+
}
241+
}
242+
else {
243+
flb_debug("no header set");
244+
}
245+
158246
switch (ctx->timestamp_format) {
159247
case FLB_JSON_DATE_DOUBLE:
160248
msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
@@ -221,7 +309,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
221309
if (ctx->dynamic_topic) {
222310
/* Only if default topic is set and this topicname is not set for this message */
223311
if (strncmp(topic->name, flb_kafka_topic_default(ctx)->name, val.via.str.size) == 0 &&
224-
(strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
312+
(strncmp(topic->name, val.via.str.ptr, val.via.str.size) != 0) ) {
225313
if (memchr(val.via.str.ptr, ',', val.via.str.size)) {
226314
/* Don't allow commas in kafkatopic name */
227315
flb_warn("',' not allowed in dynamic_kafka topic names");
@@ -392,12 +480,22 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
392480
return FLB_RETRY;
393481
}
394482

395-
ret = rd_kafka_produce(topic->tp,
396-
RD_KAFKA_PARTITION_UA,
397-
RD_KAFKA_MSG_F_COPY,
398-
out_buf, out_size,
399-
message_key, message_key_len,
400-
ctx);
483+
rd_kafka_resp_err_t err = rd_kafka_producev(ctx->kafka.rk,
484+
RD_KAFKA_V_TOPIC(rd_kafka_topic_name(topic->tp)),
485+
RD_KAFKA_V_HEADERS(kafka_headers),
486+
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
487+
RD_KAFKA_V_VALUE(out_buf, out_size),
488+
RD_KAFKA_V_KEY(message_key, message_key_len),
489+
RD_KAFKA_V_END);
490+
491+
492+
493+
494+
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
495+
flb_plg_info(ctx->ins, "Sending message completed");
496+
ret = 0;
497+
}
498+
401499

402500
if (ret == -1) {
403501
flb_error(
@@ -455,7 +553,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
455553
AVRO_FREE(avro_fast_buffer, out_buf)
456554
}
457555
#endif
458-
556+
459557
msgpack_sbuffer_destroy(&mp_sbuf);
460558
return FLB_OK;
461559
}
@@ -622,6 +720,12 @@ static struct flb_config_map config_map[] = {
622720
0, FLB_FALSE, 0,
623721
"Set the kafka topics, delimited by commas."
624722
},
723+
{
724+
FLB_CONFIG_MAP_SLIST_1, "header", NULL,
725+
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_kafka, headers),
726+
"Add a kafka message header key/value pair. Multiple headers can be set"
727+
},
728+
625729
{
626730
FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
627731
0, FLB_FALSE, 0,
@@ -647,6 +751,8 @@ static struct flb_config_map config_map[] = {
647751
{0}
648752
};
649753

754+
755+
650756
struct flb_output_plugin out_kafka_plugin = {
651757
.name = "kafka",
652758
.description = "Kafka",

plugins/out_kafka/kafka_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ struct flb_out_kafka {
8686
/* Head of defined topics by configuration */
8787
struct mk_list topics;
8888

89+
/* Headers map defined by configuration*/
90+
struct mk_list *headers;
91+
8992
/*
9093
* Blocked Status: since rdkafka have it own buffering queue, there is a
9194
* chance that the queue becomes full, when that happens our default

0 commit comments

Comments
 (0)