Skip to content

Commit 932a967

Browse files
committed
in_elasticsearch: reimplement tag_key lookup with record accessor api
Replace manual key lookup with record accessor pattern for better performance and support for nested/complex key patterns. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 964ebca commit 932a967

File tree

3 files changed

+39
-57
lines changed

3 files changed

+39
-57
lines changed

plugins/in_elasticsearch/in_elasticsearch.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <fluent-bit/flb_input.h>
2626
#include <fluent-bit/flb_utils.h>
2727
#include <fluent-bit/flb_log_event_encoder.h>
28+
#include <fluent-bit/flb_record_accessor.h>
2829

2930
#include <monkey/monkey.h>
3031
#include <fluent-bit/http_server/flb_http_server.h>
@@ -35,14 +36,15 @@
3536
struct flb_in_elasticsearch {
3637
flb_sds_t listen;
3738
flb_sds_t tcp_port;
38-
const char *tag_key;
39-
const char *meta_key;
39+
flb_sds_t tag_key;
40+
flb_sds_t meta_key;
4041
flb_sds_t hostname;
4142
flb_sds_t es_version;
4243
char cluster_name[16];
4344
char node_name[12];
4445

4546
struct flb_log_event_encoder *log_encoder;
47+
struct flb_record_accessor *ra_tag_key;
4648

4749
struct flb_input_instance *ins;
4850

plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <fluent-bit/flb_error.h>
2323
#include <fluent-bit/flb_pack.h>
2424
#include <fluent-bit/flb_gzip.h>
25+
#include <fluent-bit/flb_record_accessor.h>
26+
#include <fluent-bit/flb_ra_key.h>
2527

2628
#include <monkey/monkey.h>
2729
#include <monkey/mk_core.h>
@@ -245,67 +247,32 @@ static int send_response(struct in_elasticsearch_bulk_conn *conn, int http_statu
245247
/* implements functionality to get tag from key in record */
246248
static flb_sds_t tag_key(struct flb_in_elasticsearch *ctx, msgpack_object *map)
247249
{
248-
size_t map_size = map->via.map.size;
249-
msgpack_object_kv *kv;
250-
msgpack_object key;
251-
msgpack_object val;
252-
char *key_str = NULL;
253-
char *val_str = NULL;
254-
size_t key_str_size = 0;
255-
size_t val_str_size = 0;
256-
int j;
257-
int check = FLB_FALSE;
258-
int found = FLB_FALSE;
259-
flb_sds_t tag;
260-
261-
kv = map->via.map.ptr;
250+
flb_sds_t tag = NULL;
251+
struct flb_ra_value *ra_val;
262252

263-
for(j=0; j < map_size; j++) {
264-
check = FLB_FALSE;
265-
found = FLB_FALSE;
266-
key = (kv+j)->key;
267-
if (key.type == MSGPACK_OBJECT_BIN) {
268-
key_str = (char *) key.via.bin.ptr;
269-
key_str_size = key.via.bin.size;
270-
check = FLB_TRUE;
271-
}
272-
if (key.type == MSGPACK_OBJECT_STR) {
273-
key_str = (char *) key.via.str.ptr;
274-
key_str_size = key.via.str.size;
275-
check = FLB_TRUE;
276-
}
277-
278-
if (check == FLB_TRUE) {
279-
if (strncmp(ctx->tag_key, key_str, key_str_size) == 0) {
280-
val = (kv+j)->val;
281-
if (val.type == MSGPACK_OBJECT_BIN) {
282-
val_str = (char *) val.via.bin.ptr;
283-
val_str_size = val.via.str.size;
284-
found = FLB_TRUE;
285-
break;
286-
}
287-
if (val.type == MSGPACK_OBJECT_STR) {
288-
val_str = (char *) val.via.str.ptr;
289-
val_str_size = val.via.str.size;
290-
found = FLB_TRUE;
291-
break;
292-
}
293-
}
294-
}
253+
/* If no record accessor is configured, return NULL */
254+
if (!ctx->ra_tag_key) {
255+
return NULL;
295256
}
296257

297-
if (found == FLB_TRUE) {
298-
tag = flb_sds_create_len(val_str, val_str_size);
299-
if (!tag) {
300-
flb_errno();
301-
return NULL;
302-
}
303-
return tag;
258+
/* Use record accessor to get the value */
259+
ra_val = flb_ra_get_value_object(ctx->ra_tag_key, *map);
260+
if (!ra_val) {
261+
flb_plg_warn(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key);
262+
return NULL;
304263
}
305264

265+
/* Convert the value to string */
266+
if (ra_val->type == FLB_RA_STRING) {
267+
tag = flb_sds_create_len(ra_val->o.via.str.ptr, ra_val->o.via.str.size);
268+
}
269+
else {
270+
flb_plg_error(ctx->ins, "tag_key %s value is not a string or binary", ctx->tag_key);
271+
}
306272

307-
flb_plg_error(ctx->ins, "Could not find tag_key %s in record", ctx->tag_key);
308-
return NULL;
273+
/* Clean up the record accessor value */
274+
flb_ra_key_value_destroy(ra_val);
275+
return tag;
309276
}
310277

311278
static int get_write_op(struct flb_in_elasticsearch *ctx, msgpack_object *map, flb_sds_t *out_write_op, size_t *out_key_size)

plugins/in_elasticsearch/in_elasticsearch_config.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,25 @@ struct flb_in_elasticsearch *in_elasticsearch_config_create(struct flb_input_ins
6767
return NULL;
6868
}
6969

70+
/* Create record accessor for tag_key if specified */
71+
if (ctx->tag_key) {
72+
ctx->ra_tag_key = flb_ra_create(ctx->tag_key, FLB_TRUE);
73+
if (!ctx->ra_tag_key) {
74+
flb_plg_error(ctx->ins, "invalid record accessor pattern for tag_key: %s", ctx->tag_key);
75+
in_elasticsearch_config_destroy(ctx);
76+
return NULL;
77+
}
78+
}
7079

7180
return ctx;
7281
}
7382

7483
int in_elasticsearch_config_destroy(struct flb_in_elasticsearch *ctx)
7584
{
85+
if (ctx->ra_tag_key) {
86+
flb_ra_destroy(ctx->ra_tag_key);
87+
}
88+
7689
flb_log_event_encoder_destroy(ctx->log_encoder);
7790

7891
/* release all connections */

0 commit comments

Comments
 (0)