Skip to content

Commit d4bb369

Browse files
committed
out_kafka_rest: Add support for Message_Key_Field in sending messages to Kafka Rest
Signed-off-by: TechBRSavvy <[email protected]>
1 parent 7c46a40 commit d4bb369

File tree

3 files changed

+55
-5
lines changed

3 files changed

+55
-5
lines changed

plugins/out_kafka_rest/kafka.c

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ static struct flb_config_map config_map[] = {
3737
"Specify a message key. "
3838
},
3939

40+
{
41+
FLB_CONFIG_MAP_STR, "message_key_field", NULL,
42+
0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key_field),
43+
"Specify a message key field. "
44+
},
45+
4046
{
4147
FLB_CONFIG_MAP_STR, "time_key", NULL,
4248
0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key),
@@ -100,6 +106,8 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
100106
int len;
101107
int arr_size = 0;
102108
int map_size;
109+
char *message_key = NULL;
110+
size_t message_key_len = 0;
103111
size_t s;
104112
flb_sds_t out_buf;
105113
char time_formatted[256];
@@ -146,8 +154,34 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
146154
if (ctx->partition >= 0) {
147155
map_size++;
148156
}
157+
message_key = NULL;
158+
message_key_len = 0;
159+
160+
/*
161+
* Logic for populating Message Key in below mentioned order
162+
* - If Message_Key_Field is defined and present in the record, use it
163+
* - If Message_Key_Field is defined but not present in the record, look for Message_Key
164+
* - If Message_Key is defined, use it
165+
*/
166+
if (ctx->message_key_field && val.type == MSGPACK_OBJECT_STR) {
167+
for (i = 0; i < map.via.map.size; i++) {
168+
key = map.via.map.ptr[i].key;
169+
val = map.via.map.ptr[i].val;
170+
if (key.via.str.size == ctx->message_key_field_len &&
171+
strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) {
172+
message_key = (char *) val.via.str.ptr;
173+
message_key_len = val.via.str.size;
174+
break;
175+
}
176+
}
177+
}
178+
179+
if (message_key == NULL && ctx->message_key != NULL) {
180+
message_key = ctx->message_key;
181+
message_key_len = ctx->message_key_len;
182+
}
149183

150-
if (ctx->message_key != NULL) {
184+
if (message_key != NULL) {
151185
map_size++;
152186
}
153187

@@ -158,12 +192,11 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
158192
msgpack_pack_int64(&mp_pck, ctx->partition);
159193
}
160194

161-
162-
if (ctx->message_key != NULL) {
195+
if (message_key != NULL) {
163196
msgpack_pack_str(&mp_pck, 3);
164197
msgpack_pack_str_body(&mp_pck, "key", 3);
165-
msgpack_pack_str(&mp_pck, ctx->message_key_len);
166-
msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len);
198+
msgpack_pack_str(&mp_pck, message_key_len);
199+
msgpack_pack_str_body(&mp_pck, message_key, message_key_len);
167200
}
168201

169202
/* Value Map Size */

plugins/out_kafka_rest/kafka.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ struct flb_kafka_rest {
3030
char *topic;
3131
int message_key_len;
3232
char *message_key;
33+
int message_key_field_len;
34+
char *message_key_field;
3335

3436
/* HTTP Auth */
3537
char *http_user;

plugins/out_kafka_rest/kafka_conf.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,17 @@ struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins,
192192
ctx->message_key_len = 0;
193193
}
194194

195+
/* Config: Message_Key_Field */
196+
tmp = flb_output_get_property("message_key_field", ins);
197+
if (tmp) {
198+
ctx->message_key_field = flb_strdup(tmp);
199+
ctx->message_key_field_len = strlen(tmp);
200+
}
201+
else {
202+
ctx->message_key_field = NULL;
203+
ctx->message_key_field_len = 0;
204+
}
205+
195206
return ctx;
196207
}
197208

@@ -216,6 +227,10 @@ int flb_kr_conf_destroy(struct flb_kafka_rest *ctx)
216227
flb_free(ctx->message_key);
217228
}
218229

230+
if (ctx->message_key_field) {
231+
flb_free(ctx->message_key_field);
232+
}
233+
219234
flb_upstream_destroy(ctx->u);
220235
flb_free(ctx);
221236

0 commit comments

Comments
 (0)