Skip to content

Commit fdfe2e0

Browse files
committed
out_es: Add target_index variable using record accessor syntax.
Add target_index variable for rendering ES index name using record accessor syntax without extra bytes (i.e.: no time component on index name).
1 parent 7c3136c commit fdfe2e0

File tree

4 files changed

+188
-3
lines changed

4 files changed

+188
-3
lines changed

plugins/out_es/es.c

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ static int elasticsearch_format(struct flb_config *config,
295295
size_t off = 0;
296296
size_t off_prev = 0;
297297
char *es_index;
298+
char target_index_value[256];
298299
char logstash_index[256];
299300
char time_formatted[256];
300301
char index_formatted[256];
@@ -350,13 +351,15 @@ static int elasticsearch_format(struct flb_config *config,
350351
}
351352

352353
/*
353-
* If logstash format and id generation are disabled, pre-generate
354-
* the index line for all records.
354+
* If logstash format, target_index record accessor and id generation are disabled,
355+
* pre-generate the index line for all records.
355356
*
356357
* The header stored in 'j_index' will be used for the all records on
357358
* this payload.
358359
*/
359-
if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) {
360+
if (ctx->logstash_format == FLB_FALSE &&
361+
ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) {
362+
360363
flb_time_get(&tms);
361364
gmtime_r(&tms.tm.tv_sec, &tm);
362365
strftime(index_formatted, sizeof(index_formatted) - 1,
@@ -453,6 +456,42 @@ static int elasticsearch_format(struct flb_config *config,
453456
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
454457

455458
es_index = ctx->index;
459+
if (ctx->ra_target_index) {
460+
flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index,
461+
(char *) tag, tag_len,
462+
map, NULL, FLB_TRUE);
463+
if (v) {
464+
len = flb_sds_len(v);
465+
if (len > 128) {
466+
len = 128;
467+
}
468+
memcpy(target_index_value, v, len);
469+
target_index_value[len] = '\0';
470+
es_index_custom_len = len;
471+
flb_sds_destroy(v);
472+
es_index = target_index_value;
473+
}
474+
else {
475+
flb_plg_warn(ctx->ins,
476+
"the value of %s is missing for target_index_key",
477+
ctx->target_index);
478+
}
479+
if (ctx->generate_id == FLB_FALSE) {
480+
if (ctx->suppress_type_name) {
481+
index_len = flb_sds_snprintf (&j_index,
482+
flb_sds_alloc (j_index),
483+
ES_BULK_INDEX_FMT_WITHOUT_TYPE,
484+
ctx->es_action, es_index);
485+
}
486+
else {
487+
index_len = flb_sds_snprintf (&j_index,
488+
flb_sds_alloc (j_index),
489+
ES_BULK_INDEX_FMT,
490+
ctx->es_action,
491+
es_index, ctx->type);
492+
}
493+
}
494+
}
456495
if (ctx->logstash_format == FLB_TRUE) {
457496
ret = compose_index_header(ctx, es_index_custom_len,
458497
&logstash_index[0], sizeof(logstash_index),
@@ -1180,6 +1219,13 @@ static struct flb_config_map config_map[] = {
11801219
0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
11811220
"If set, _id will be the value of the key from incoming record."
11821221
},
1222+
{
1223+
FLB_CONFIG_MAP_STR, "target_index", NULL,
1224+
0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index),
1225+
"Compose index name using record accessor syntax. If any record accessor "
1226+
"variable is invalid (i.e.: The key is not present in event) the value of 'index' "
1227+
"will be used."
1228+
},
11831229
{
11841230
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
11851231
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),

plugins/out_es/es.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ struct flb_elasticsearch {
118118
flb_sds_t id_key;
119119
struct flb_record_accessor *ra_id_key;
120120

121+
/* target_index */
122+
flb_sds_t target_index;
123+
struct flb_record_accessor *ra_target_index;
124+
121125
/* include_tag_key */
122126
int include_tag_key;
123127
flb_sds_t tag_key;

plugins/out_es/es_conf.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,17 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
271271
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
272272
}
273273

274+
/* Set index record accessor */
275+
if (ctx->target_index) {
276+
ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE);
277+
fprintf(stderr, "record accessor at %p", ctx->ra_target_index);
278+
if (!ctx->ra_target_index) {
279+
flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index);
280+
flb_es_conf_destroy(ctx);
281+
return NULL;
282+
}
283+
}
284+
274285
if (ctx->id_key) {
275286
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
276287
if (ctx->ra_id_key == NULL) {
@@ -479,6 +490,10 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
479490
flb_ra_destroy(ctx->ra_id_key);
480491
ctx->ra_id_key = NULL;
481492
}
493+
if (ctx->ra_target_index) {
494+
flb_ra_destroy(ctx->ra_target_index);
495+
ctx->ra_target_index = NULL;
496+
}
482497
if (ctx->es_action) {
483498
flb_free(ctx->es_action);
484499
}

tests/runtime/out_elasticsearch.c

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,35 @@ static void cb_check_id_key(void *ctx, int ffd,
167167
flb_free(res_data);
168168
}
169169

170+
static void cb_check_target_index(void *ctx, int ffd,
171+
int res_ret, void *res_data, size_t res_size,
172+
void *data)
173+
{
174+
char *p;
175+
char *out_js = res_data;
176+
char *index_line = "{\"create\":{\"_index\":\"aaa-JSON_END\",\"_type\":\"_doc\"}";
177+
178+
p = strstr(out_js, index_line);
179+
TEST_CHECK(p != NULL);
180+
flb_free(res_data);
181+
}
182+
183+
static void cb_check_target_index_default(void *ctx, int ffd,
184+
int res_ret, void *res_data, size_t res_size,
185+
void *data)
186+
{
187+
char *p;
188+
char *out_js = res_data;
189+
char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}";
190+
191+
p = strstr(out_js, index_line);
192+
TEST_CHECK(p != NULL);
193+
if(!TEST_CHECK(p != NULL)) {
194+
TEST_MSG("Got: %s", out_js);
195+
}
196+
flb_free(res_data);
197+
}
198+
170199
void flb_test_write_operation_index()
171200
{
172201
int ret;
@@ -799,6 +828,95 @@ void flb_test_logstash_prefix_separator()
799828
flb_destroy(ctx);
800829
}
801830

831+
void flb_test_target_index()
832+
{
833+
int ret;
834+
int size = sizeof(JSON_ES) - 1;
835+
flb_ctx_t *ctx;
836+
int in_ffd;
837+
int out_ffd;
838+
839+
/* Create context, flush every second (some checks omitted here) */
840+
ctx = flb_create();
841+
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
842+
843+
/* Lib input mode */
844+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
845+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
846+
847+
/* Elasticsearch output */
848+
out_ffd = flb_output(ctx, (char *) "es", NULL);
849+
flb_output_set(ctx, out_ffd,
850+
"match", "test",
851+
NULL);
852+
853+
/* Override defaults of index and type */
854+
flb_output_set(ctx, out_ffd,
855+
"target_index", "aaa-$END_KEY",
856+
NULL);
857+
858+
/* Enable test mode */
859+
ret = flb_output_set_test(ctx, out_ffd, "formatter",
860+
cb_check_target_index,
861+
NULL, NULL);
862+
863+
/* Start */
864+
ret = flb_start(ctx);
865+
TEST_CHECK(ret == 0);
866+
867+
/* Ingest data sample */
868+
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);
869+
870+
sleep(2);
871+
flb_stop(ctx);
872+
flb_destroy(ctx);
873+
}
874+
875+
void flb_test_target_index_default()
876+
{
877+
int ret;
878+
int size = sizeof(JSON_ES) - 1;
879+
flb_ctx_t *ctx;
880+
int in_ffd;
881+
int out_ffd;
882+
883+
/* Create context, flush every second (some checks omitted here) */
884+
ctx = flb_create();
885+
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
886+
887+
/* Lib input mode */
888+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
889+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
890+
891+
/* Elasticsearch output */
892+
out_ffd = flb_output(ctx, (char *) "es", NULL);
893+
flb_output_set(ctx, out_ffd,
894+
"match", "test",
895+
NULL);
896+
897+
/* Override defaults of index and type */
898+
flb_output_set(ctx, out_ffd,
899+
"target_index", "aaa-$not_found_key",
900+
"index", "default",
901+
NULL);
902+
903+
/* Enable test mode */
904+
ret = flb_output_set_test(ctx, out_ffd, "formatter",
905+
cb_check_target_index_default,
906+
NULL, NULL);
907+
908+
/* Start */
909+
ret = flb_start(ctx);
910+
TEST_CHECK(ret == 0);
911+
912+
/* Ingest data sample */
913+
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);
914+
915+
sleep(2);
916+
flb_stop(ctx);
917+
flb_destroy(ctx);
918+
}
919+
802920
/* Test list */
803921
TEST_LIST = {
804922
{"long_index" , flb_test_long_index },
@@ -814,5 +932,7 @@ TEST_LIST = {
814932
{"replace_dots" , flb_test_replace_dots },
815933
{"id_key" , flb_test_id_key },
816934
{"logstash_prefix_separator" , flb_test_logstash_prefix_separator },
935+
{"target_index" , flb_test_target_index },
936+
{"target_index_default" , flb_test_target_index_default },
817937
{NULL, NULL}
818938
};

0 commit comments

Comments
 (0)