Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions plugins/out_kafka_rest/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ static struct flb_config_map config_map[] = {
"Specify a message key. "
},

{
FLB_CONFIG_MAP_STR, "message_key_field", NULL,
0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key_field),
"Specify a message key field. "
},

{
FLB_CONFIG_MAP_STR, "time_key", NULL,
0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key),
Expand Down Expand Up @@ -100,6 +106,8 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
int len;
int arr_size = 0;
int map_size;
char *message_key = NULL;
size_t message_key_len = 0;
size_t s;
flb_sds_t out_buf;
char time_formatted[256];
Expand Down Expand Up @@ -146,8 +154,34 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
if (ctx->partition >= 0) {
map_size++;
}
message_key = NULL;
message_key_len = 0;

/*
* Logic for populating Message Key in below mentioned order
* - If Message_Key_Field is defined and present in the record, use it
* - If Message_Key_Field is defined but not present in the record, look for Message_Key
* - If Message_Key is defined, use it
*/
if (ctx->message_key_field && val.type == MSGPACK_OBJECT_STR) {
for (i = 0; i < map.via.map.size; i++) {
key = map.via.map.ptr[i].key;
val = map.via.map.ptr[i].val;
if (key.via.str.size == ctx->message_key_field_len &&
strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) {
message_key = (char *) val.via.str.ptr;
message_key_len = val.via.str.size;
break;
}
}
}

if (message_key == NULL && ctx->message_key != NULL) {
message_key = ctx->message_key;
message_key_len = ctx->message_key_len;
}

if (ctx->message_key != NULL) {
if (message_key != NULL) {
map_size++;
}

Expand All @@ -158,12 +192,11 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
msgpack_pack_int64(&mp_pck, ctx->partition);
}


if (ctx->message_key != NULL) {
if (message_key != NULL) {
msgpack_pack_str(&mp_pck, 3);
msgpack_pack_str_body(&mp_pck, "key", 3);
msgpack_pack_str(&mp_pck, ctx->message_key_len);
msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len);
msgpack_pack_str(&mp_pck, message_key_len);
msgpack_pack_str_body(&mp_pck, message_key, message_key_len);
}

/* Value Map Size */
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_kafka_rest/kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ struct flb_kafka_rest {
char *topic;
int message_key_len;
char *message_key;
int message_key_field_len;
char *message_key_field;

/* HTTP Auth */
char *http_user;
Expand Down
15 changes: 15 additions & 0 deletions plugins/out_kafka_rest/kafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins,
ctx->message_key_len = 0;
}

/* Config: Message_Key_Field */
tmp = flb_output_get_property("message_key_field", ins);
if (tmp) {
ctx->message_key_field = flb_strdup(tmp);
ctx->message_key_field_len = strlen(tmp);
}
else {
ctx->message_key_field = NULL;
ctx->message_key_field_len = 0;
}

return ctx;
}

Expand All @@ -216,6 +227,10 @@ int flb_kr_conf_destroy(struct flb_kafka_rest *ctx)
flb_free(ctx->message_key);
}

if (ctx->message_key_field) {
flb_free(ctx->message_key_field);
}

flb_upstream_destroy(ctx->u);
flb_free(ctx);

Expand Down