Skip to content

Commit 8a6a409

Browse files
committed
out_es: Detect if index uses RA syntax as in out_opensearch plugin
Signed-off-by: Victor Cabezas <[email protected]>
1 parent eedb470 commit 8a6a409

File tree

5 files changed

+34
-104
lines changed

5 files changed

+34
-104
lines changed

plugins/out_es/es.c

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ static int elasticsearch_format(struct flb_config *config,
295295
size_t off = 0;
296296
size_t off_prev = 0;
297297
char *es_index;
298-
char target_index_value[256];
298+
char index_value[256];
299299
char logstash_index[256];
300300
char time_formatted[256];
301301
char index_formatted[256];
@@ -351,14 +351,14 @@ static int elasticsearch_format(struct flb_config *config,
351351
}
352352

353353
/*
354-
* If logstash format, target_index record accessor and id generation are disabled,
354+
* If logstash format, index record accessor and id generation are disabled,
355355
* pre-generate the index line for all records.
356356
*
357357
* The header stored in 'j_index' will be used for the all records on
358358
* this payload.
359359
*/
360360
if (ctx->logstash_format == FLB_FALSE &&
361-
ctx->generate_id == FLB_FALSE && ctx->ra_target_index == NULL) {
361+
ctx->generate_id == FLB_FALSE && ctx->ra_index == NULL) {
362362

363363
flb_time_get(&tms);
364364
gmtime_r(&tms.tm.tv_sec, &tm);
@@ -456,25 +456,20 @@ static int elasticsearch_format(struct flb_config *config,
456456
msgpack_pack_str_body(&tmp_pck, time_formatted, s);
457457

458458
es_index = ctx->index;
459-
if (ctx->ra_target_index) {
460-
flb_sds_t v = flb_ra_translate_check (ctx->ra_target_index,
459+
if (ctx->ra_index) {
460+
flb_sds_t v = flb_ra_translate(ctx->ra_index,
461461
(char *) tag, tag_len,
462-
map, NULL, FLB_TRUE);
462+
map, NULL);
463463
if (v) {
464464
len = flb_sds_len(v);
465465
if (len > 128) {
466466
len = 128;
467467
}
468-
memcpy(target_index_value, v, len);
469-
target_index_value[len] = '\0';
468+
memcpy(index_value, v, len);
469+
index_value[len] = '\0';
470470
es_index_custom_len = len;
471471
flb_sds_destroy(v);
472-
es_index = target_index_value;
473-
}
474-
else {
475-
flb_plg_warn(ctx->ins,
476-
"the value of %s is missing for target_index_key",
477-
ctx->target_index);
472+
es_index = index_value;
478473
}
479474
if (ctx->generate_id == FLB_FALSE) {
480475
if (ctx->suppress_type_name) {
@@ -1219,13 +1214,6 @@ static struct flb_config_map config_map[] = {
12191214
0, FLB_TRUE, offsetof(struct flb_elasticsearch, id_key),
12201215
"If set, _id will be the value of the key from incoming record."
12211216
},
1222-
{
1223-
FLB_CONFIG_MAP_STR, "target_index", NULL,
1224-
0, FLB_TRUE, offsetof(struct flb_elasticsearch, target_index),
1225-
"Compose index name using record accessor syntax. If any record accessor "
1226-
"variable is invalid (i.e.: The key is not present in event) the value of 'index' "
1227-
"will be used."
1228-
},
12291217
{
12301218
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
12311219
0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots),

plugins/out_es/es.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
struct flb_elasticsearch {
4040
/* Elasticsearch index (database) and type (table) */
4141
char *index;
42+
struct flb_record_accessor *ra_index;
43+
4244
char *type;
4345
char suppress_type_name;
4446

@@ -118,10 +120,6 @@ struct flb_elasticsearch {
118120
flb_sds_t id_key;
119121
struct flb_record_accessor *ra_id_key;
120122

121-
/* target_index */
122-
flb_sds_t target_index;
123-
struct flb_record_accessor *ra_target_index;
124-
125123
/* include_tag_key */
126124
int include_tag_key;
127125
flb_sds_t tag_key;

plugins/out_es/es_conf.c

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,20 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
250250
if (f_type) {
251251
ctx->type = flb_strdup(f_type->value); /* FIXME */
252252
}
253+
else {
254+
/* Check if the index has been set in the configuration */
255+
if (ctx->index) {
256+
/* do we have a record accessor pattern ? */
257+
if (strchr(ctx->index, '$')) {
258+
ctx->ra_index = flb_ra_create(ctx->index, FLB_TRUE);
259+
if (!ctx->ra_index) {
260+
flb_plg_error(ctx->ins, "invalid record accessor pattern set for 'index' property");
261+
flb_es_conf_destroy(ctx);
262+
return NULL;
263+
}
264+
}
265+
}
266+
}
253267

254268
/* HTTP Payload (response) maximum buffer size (0 == unlimited) */
255269
if (ctx->buffer_size == -1) {
@@ -271,15 +285,6 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins,
271285
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path);
272286
}
273287

274-
/* Set index record accessor */
275-
if (ctx->target_index) {
276-
ctx->ra_target_index = flb_ra_create(ctx->target_index, FLB_TRUE);
277-
if (!ctx->ra_target_index) {
278-
flb_plg_error(ctx->ins, "invalid record accessor expression for index '%s'", ctx->target_index);
279-
flb_es_conf_destroy(ctx);
280-
return NULL;
281-
}
282-
}
283288

284289
if (ctx->id_key) {
285290
ctx->ra_id_key = flb_ra_create(ctx->id_key, FLB_FALSE);
@@ -489,9 +494,9 @@ int flb_es_conf_destroy(struct flb_elasticsearch *ctx)
489494
flb_ra_destroy(ctx->ra_id_key);
490495
ctx->ra_id_key = NULL;
491496
}
492-
if (ctx->ra_target_index) {
493-
flb_ra_destroy(ctx->ra_target_index);
494-
ctx->ra_target_index = NULL;
497+
if (ctx->ra_index) {
498+
flb_ra_destroy(ctx->ra_index);
499+
ctx->ra_index = NULL;
495500
}
496501
if (ctx->es_action) {
497502
flb_free(ctx->es_action);

run_code_analysis.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ fi
3737
-e INPUT_DEPENDENCIES_DEBIAN="$ADDITIONAL_DEPS" \
3838
-e INPUT_CMAKEFLAGS="$FLB_CMAKE_OPTIONS $SKIP" \
3939
-e INPUT_PRE_COMMAND="cp -R /source /tmp" \
40+
-e INPUT_TEST_COMMAND="${INPUT_TEST_COMMAND}" \
4041
-e INPUT_WORKING-DIRECTORY="/tmp/source" \
4142
lpenz/ghaction-cmake:0.19

tests/runtime/out_elasticsearch.c

Lines changed: 5 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ static void cb_check_id_key(void *ctx, int ffd,
167167
flb_free(res_data);
168168
}
169169

170-
static void cb_check_target_index(void *ctx, int ffd,
170+
static void cb_check_index_ra(void *ctx, int ffd,
171171
int res_ret, void *res_data, size_t res_size,
172172
void *data)
173173
{
@@ -180,22 +180,6 @@ static void cb_check_target_index(void *ctx, int ffd,
180180
flb_free(res_data);
181181
}
182182

183-
static void cb_check_target_index_default(void *ctx, int ffd,
184-
int res_ret, void *res_data, size_t res_size,
185-
void *data)
186-
{
187-
char *p;
188-
char *out_js = res_data;
189-
char *index_line = "{\"create\":{\"_index\":\"default\",\"_type\":\"_doc\"}";
190-
191-
p = strstr(out_js, index_line);
192-
TEST_CHECK(p != NULL);
193-
if(!TEST_CHECK(p != NULL)) {
194-
TEST_MSG("Got: %s", out_js);
195-
}
196-
flb_free(res_data);
197-
}
198-
199183
void flb_test_write_operation_index()
200184
{
201185
int ret;
@@ -828,51 +812,7 @@ void flb_test_logstash_prefix_separator()
828812
flb_destroy(ctx);
829813
}
830814

831-
void flb_test_target_index()
832-
{
833-
int ret;
834-
int size = sizeof(JSON_ES) - 1;
835-
flb_ctx_t *ctx;
836-
int in_ffd;
837-
int out_ffd;
838-
839-
/* Create context, flush every second (some checks omitted here) */
840-
ctx = flb_create();
841-
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
842-
843-
/* Lib input mode */
844-
in_ffd = flb_input(ctx, (char *) "lib", NULL);
845-
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
846-
847-
/* Elasticsearch output */
848-
out_ffd = flb_output(ctx, (char *) "es", NULL);
849-
flb_output_set(ctx, out_ffd,
850-
"match", "test",
851-
NULL);
852-
853-
/* Override defaults of index and type */
854-
flb_output_set(ctx, out_ffd,
855-
"target_index", "aaa-$END_KEY",
856-
NULL);
857-
858-
/* Enable test mode */
859-
ret = flb_output_set_test(ctx, out_ffd, "formatter",
860-
cb_check_target_index,
861-
NULL, NULL);
862-
863-
/* Start */
864-
ret = flb_start(ctx);
865-
TEST_CHECK(ret == 0);
866-
867-
/* Ingest data sample */
868-
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);
869-
870-
sleep(2);
871-
flb_stop(ctx);
872-
flb_destroy(ctx);
873-
}
874-
875-
void flb_test_target_index_default()
815+
void flb_test_index_ra()
876816
{
877817
int ret;
878818
int size = sizeof(JSON_ES) - 1;
@@ -896,13 +836,12 @@ void flb_test_target_index_default()
896836

897837
/* Override defaults of index and type */
898838
flb_output_set(ctx, out_ffd,
899-
"target_index", "aaa-$not_found_key",
900-
"index", "default",
839+
"index", "aaa-$END_KEY",
901840
NULL);
902841

903842
/* Enable test mode */
904843
ret = flb_output_set_test(ctx, out_ffd, "formatter",
905-
cb_check_target_index_default,
844+
cb_check_index_ra,
906845
NULL, NULL);
907846

908847
/* Start */
@@ -926,13 +865,12 @@ TEST_LIST = {
926865
{"write_operation_update", flb_test_write_operation_update },
927866
{"write_operation_upsert", flb_test_write_operation_upsert },
928867
{"index_type" , flb_test_index_type },
868+
{"index_ra" , flb_test_index_ra},
929869
{"logstash_format" , flb_test_logstash_format },
930870
{"logstash_format_nanos" , flb_test_logstash_format_nanos },
931871
{"tag_key" , flb_test_tag_key },
932872
{"replace_dots" , flb_test_replace_dots },
933873
{"id_key" , flb_test_id_key },
934874
{"logstash_prefix_separator" , flb_test_logstash_prefix_separator },
935-
{"target_index" , flb_test_target_index },
936-
{"target_index_default" , flb_test_target_index_default },
937875
{NULL, NULL}
938876
};

0 commit comments

Comments
 (0)