From d4bb3696e71527d562ba15ecd281fee12c4f871f Mon Sep 17 00:00:00 2001 From: TechBRSavvy Date: Wed, 9 Aug 2023 12:32:54 +0000 Subject: [PATCH] out_kafka_rest: Add support for Message_Key_Field in sending messages to Kafka Rest Signed-off-by: TechBRSavvy --- plugins/out_kafka_rest/kafka.c | 43 +++++++++++++++++++++++++---- plugins/out_kafka_rest/kafka.h | 2 ++ plugins/out_kafka_rest/kafka_conf.c | 15 ++++++++++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/plugins/out_kafka_rest/kafka.c b/plugins/out_kafka_rest/kafka.c index f3b6153a6f8..682a28d3b63 100644 --- a/plugins/out_kafka_rest/kafka.c +++ b/plugins/out_kafka_rest/kafka.c @@ -37,6 +37,12 @@ static struct flb_config_map config_map[] = { "Specify a message key. " }, + { + FLB_CONFIG_MAP_STR, "message_key_field", NULL, + 0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key_field), + "Specify a message key field. " + }, + { FLB_CONFIG_MAP_STR, "time_key", NULL, 0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key), @@ -100,6 +106,8 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes, int len; int arr_size = 0; int map_size; + char *message_key = NULL; + size_t message_key_len = 0; size_t s; flb_sds_t out_buf; char time_formatted[256]; @@ -146,8 +154,34 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes, if (ctx->partition >= 0) { map_size++; } + message_key = NULL; + message_key_len = 0; + + /* + * Logic for populating Message Key in below mentioned order + * - If Message_Key_Field is defined and present in the record, use it + * - If Message_Key_Field is defined but not present in the record, look for Message_Key + * - If Message_Key is defined, use it + */ + if (ctx->message_key_field && val.type == MSGPACK_OBJECT_STR) { + for (i = 0; i < map.via.map.size; i++) { + key = map.via.map.ptr[i].key; + val = map.via.map.ptr[i].val; + if (key.via.str.size == ctx->message_key_field_len && + strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) { + message_key = (char *) val.via.str.ptr; + message_key_len = val.via.str.size; + break; + } + } + } + + if (message_key == NULL && ctx->message_key != NULL) { + message_key = ctx->message_key; + message_key_len = ctx->message_key_len; + } - if (ctx->message_key != NULL) { + if (message_key != NULL) { map_size++; } @@ -158,12 +192,11 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes, msgpack_pack_int64(&mp_pck, ctx->partition); } - - if (ctx->message_key != NULL) { + if (message_key != NULL) { msgpack_pack_str(&mp_pck, 3); msgpack_pack_str_body(&mp_pck, "key", 3); - msgpack_pack_str(&mp_pck, ctx->message_key_len); - msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len); + msgpack_pack_str(&mp_pck, message_key_len); + msgpack_pack_str_body(&mp_pck, message_key, message_key_len); } /* Value Map Size */ diff --git a/plugins/out_kafka_rest/kafka.h b/plugins/out_kafka_rest/kafka.h index c2d220e7d5f..77747ea67ad 100644 --- a/plugins/out_kafka_rest/kafka.h +++ b/plugins/out_kafka_rest/kafka.h @@ -30,6 +30,8 @@ struct flb_kafka_rest { char *topic; int message_key_len; char *message_key; + int message_key_field_len; + char *message_key_field; /* HTTP Auth */ char *http_user; diff --git a/plugins/out_kafka_rest/kafka_conf.c b/plugins/out_kafka_rest/kafka_conf.c index 3df50eb8bbf..129f28ee1cf 100644 --- a/plugins/out_kafka_rest/kafka_conf.c +++ b/plugins/out_kafka_rest/kafka_conf.c @@ -192,6 +192,17 @@ struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins, ctx->message_key_len = 0; } + /* Config: Message_Key_Field */ + tmp = flb_output_get_property("message_key_field", ins); + if (tmp) { + ctx->message_key_field = flb_strdup(tmp); + ctx->message_key_field_len = strlen(tmp); + } + else { + ctx->message_key_field = NULL; + ctx->message_key_field_len = 0; + } + return ctx; } @@ -216,6 +227,10 @@ int flb_kr_conf_destroy(struct flb_kafka_rest *ctx) flb_free(ctx->message_key); } + if (ctx->message_key_field) { + flb_free(ctx->message_key_field); + } + flb_upstream_destroy(ctx->u); flb_free(ctx);