Skip to content

Commit 56fad32

Browse files
committed
out_kafka_rest: Add support for Message_Key_Field in sending messages to Kafka Rest
Signed-off-by: TechBRSavvy <[email protected]>
1 parent 7c46a40 commit 56fad32

File tree

3 files changed

+53
-5
lines changed

3 files changed

+53
-5
lines changed

plugins/out_kafka_rest/kafka.c

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ static struct flb_config_map config_map[] = {
3737
"Specify a message key. "
3838
},
3939

40+
{
41+
FLB_CONFIG_MAP_STR, "message_key_field", NULL,
42+
0, FLB_TRUE, offsetof(struct flb_kafka_rest, message_key_field),
43+
"Specify a message key field. "
44+
},
45+
4046
{
4147
FLB_CONFIG_MAP_STR, "time_key", NULL,
4248
0, FLB_TRUE, offsetof(struct flb_kafka_rest, time_key),
@@ -147,7 +153,33 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
147153
map_size++;
148154
}
149155

150-
if (ctx->message_key != NULL) {
156+
/*
157+
* Logic for populating Message Key in below mentioned order
158+
* - If Message_Key_Field is defined and present in the record, use it
159+
* - If Message_Key_Field is defined but not present in the record, look for Message_Key
160+
* - If Message_Key is defined, use it
161+
*/
162+
char *message_key = NULL;
163+
size_t message_key_len = 0;
164+
if (ctx->message_key_field && !message_key && val.type == MSGPACK_OBJECT_STR) {
165+
for (i = 0; i < map.via.map.size; i++) {
166+
key = map.via.map.ptr[i].key;
167+
val = map.via.map.ptr[i].val;
168+
if (key.via.str.size == ctx->message_key_field_len &&
169+
strncmp(key.via.str.ptr, ctx->message_key_field, ctx->message_key_field_len) == 0) {
170+
message_key = (char *) val.via.str.ptr;
171+
message_key_len = val.via.str.size;
172+
break;
173+
}
174+
}
175+
}
176+
177+
if (message_key == NULL && ctx->message_key != NULL) {
178+
message_key = ctx->message_key;
179+
message_key_len = ctx->message_key_len;
180+
}
181+
182+
if (message_key != NULL) {
151183
map_size++;
152184
}
153185

@@ -158,12 +190,11 @@ static flb_sds_t kafka_rest_format(const void *data, size_t bytes,
158190
msgpack_pack_int64(&mp_pck, ctx->partition);
159191
}
160192

161-
162-
if (ctx->message_key != NULL) {
193+
if (message_key != NULL) {
163194
msgpack_pack_str(&mp_pck, 3);
164195
msgpack_pack_str_body(&mp_pck, "key", 3);
165-
msgpack_pack_str(&mp_pck, ctx->message_key_len);
166-
msgpack_pack_str_body(&mp_pck, ctx->message_key, ctx->message_key_len);
196+
msgpack_pack_str(&mp_pck, message_key_len);
197+
msgpack_pack_str_body(&mp_pck, message_key, message_key_len);
167198
}
168199

169200
/* Value Map Size */

plugins/out_kafka_rest/kafka.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ struct flb_kafka_rest {
3030
char *topic;
3131
int message_key_len;
3232
char *message_key;
33+
int message_key_field_len;
34+
char *message_key_field;
3335

3436
/* HTTP Auth */
3537
char *http_user;

plugins/out_kafka_rest/kafka_conf.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,17 @@ struct flb_kafka_rest *flb_kr_conf_create(struct flb_output_instance *ins,
192192
ctx->message_key_len = 0;
193193
}
194194

195+
/* Config: Message_Key_Field */
196+
tmp = flb_output_get_property("message_key_field", ins);
197+
if (tmp) {
198+
ctx->message_key_field = flb_strdup(tmp);
199+
ctx->message_key_field_len = strlen(tmp);
200+
}
201+
else {
202+
ctx->message_key_field = NULL;
203+
ctx->message_key_field_len = 0;
204+
}
205+
195206
return ctx;
196207
}
197208

@@ -216,6 +227,10 @@ int flb_kr_conf_destroy(struct flb_kafka_rest *ctx)
216227
flb_free(ctx->message_key);
217228
}
218229

230+
if (ctx->message_key_field) {
231+
flb_free(ctx->message_key_field);
232+
}
233+
219234
flb_upstream_destroy(ctx->u);
220235
flb_free(ctx);
221236

0 commit comments

Comments
 (0)