diff --git a/plugins/filter_parser/filter_parser.c b/plugins/filter_parser/filter_parser.c index 64a504ef905..bcc722f617c 100644 --- a/plugins/filter_parser/filter_parser.c +++ b/plugins/filter_parser/filter_parser.c @@ -93,6 +93,53 @@ static int delete_parsers(struct filter_parser_ctx *ctx) return c; } +static int nest_raw_map(struct filter_parser_ctx *ctx, + char **buf, + size_t *size, + const flb_sds_t key) +{ + msgpack_sbuffer sbuf; + msgpack_packer pk; + msgpack_unpacked outbuf_result; + msgpack_object obj; + msgpack_object_kv *kv; + const size_t key_len = flb_sds_len(key); + int ret = 0; + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + msgpack_unpacked_init(&outbuf_result); + ret = msgpack_unpack_next(&outbuf_result, *buf, *size, NULL); + if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, + "Nest: failed to unpack msgpack data with error code %d", + ret); + msgpack_unpacked_destroy(&outbuf_result); + return -1; + } + + /* Create a new map, unpacking map `buf` under the new `key` root key */ + obj = outbuf_result.data; + if (obj.type == MSGPACK_OBJECT_MAP) { + msgpack_pack_map(&pk, 1); + msgpack_pack_str(&pk, key_len); + msgpack_pack_str_body(&pk, key, key_len); + msgpack_pack_map(&pk, obj.via.map.size); + for (unsigned x = 0; x < obj.via.map.size; ++x) { + kv = &obj.via.map.ptr[x]; + msgpack_pack_object(&pk, kv->key); + msgpack_pack_object(&pk, kv->val); + } + flb_free(*buf); + *buf = sbuf.data; + *size = sbuf.size; + } + + msgpack_unpacked_destroy(&outbuf_result); + return 0; +} + static int configure(struct filter_parser_ctx *ctx, struct flb_filter_instance *f_ins, struct flb_config *config) @@ -301,6 +348,9 @@ static int cb_parser_filter(const void *data, size_t bytes, } if (out_buf != NULL && parse_ret >= 0) { + if (ctx->nest_under) { + nest_raw_map(ctx, &out_buf, &out_size, ctx->nest_under); + } if (append_arr != NULL && append_arr_len > 0) { char *new_buf = NULL; int new_size; @@ -440,6 +490,11 @@ static struct flb_config_map config_map[] = { "Keep all other original fields in the parsed result. " "If false, all other original fields will be removed." }, + { + FLB_CONFIG_MAP_STR, "Nest_Under", NULL, + 0, FLB_TRUE, offsetof(struct filter_parser_ctx, nest_under), + "Specify field name to nest parsed records under." + }, { FLB_CONFIG_MAP_DEPRECATED, "Unescape_key", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/filter_parser/filter_parser.h b/plugins/filter_parser/filter_parser.h index 36b26d9ec34..a12bdd11ffb 100644 --- a/plugins/filter_parser/filter_parser.h +++ b/plugins/filter_parser/filter_parser.h @@ -35,6 +35,7 @@ struct filter_parser_ctx { int key_name_len; int reserve_data; int preserve_key; + flb_sds_t nest_under; struct mk_list parsers; struct flb_filter_instance *ins; }; diff --git a/tests/runtime/filter_parser.c b/tests/runtime/filter_parser.c index 24368afc69d..fc5044b22bf 100644 --- a/tests/runtime/filter_parser.c +++ b/tests/runtime/filter_parser.c @@ -1299,6 +1299,83 @@ void flb_test_filter_parser_reserve_on_preserve_on() test_ctx_destroy(ctx); } +void flb_test_filter_parser_nest_under_on() +{ + int ret; + int bytes; + char *p, *output, *expected; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + struct flb_parser *parser; + + struct flb_lib_out_cb cb; + cb.cb = callback_test; + cb.data = NULL; + + clear_output(); + + ctx = flb_create(); + + /* Configure service */ + flb_service_set(ctx, "Flush", FLUSH_INTERVAL, "Grace" "1", "Log_Level", "debug", NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, + "Tag", "test", + NULL); + + /* Parser */ + parser = flb_parser_create("json", "json", NULL, + FLB_FALSE, + NULL, NULL, NULL, MK_FALSE, MK_TRUE, FLB_FALSE, FLB_FALSE, + NULL, 0, NULL, ctx->config); + TEST_CHECK(parser != NULL); + + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "parser", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "test", + "Key_Name", "to_parse", + "Nest_Under", "nest_key", + "Parser", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Output */ + out_ffd = flb_output(ctx, (char *) "lib", &cb); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "Match", "*", + "format", "json", + NULL); + + /* Start the engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Ingest data */ + p = "[1,{\"hello\":\"world\",\"some_object\":{\"foo\":\"bar\"},\"to_parse\":\"{\\\"key\\\":\\\"value\\\",\\\"object\\\":{\\\"a\\\":\\\"b\\\"}}\"}]"; + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + + wait_with_timeout(1500, &output); /* waiting flush and ensuring data flush */ + TEST_CHECK_(output != NULL, "Expected output to not be NULL"); + if (output != NULL) { + /* check extra data was not preserved */ + expected = "{\"nest_key\":{\"key\":\"value\",\"object\":{\"a\":\"b\"}}}"; + TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain key one , got '%s'", output); + free(output); + } + + flb_stop(ctx); + flb_destroy(ctx); +} + TEST_LIST = { {"filter_parser_extract_fields", flb_test_filter_parser_extract_fields }, {"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off }, @@ -1313,6 +1390,7 @@ TEST_LIST = { {"filter_parser_reserve_off_preserve_on", flb_test_filter_parser_reserve_off_preserve_on}, {"filter_parser_reserve_on_preserve_off", flb_test_filter_parser_reserve_on_preserve_off}, {"filter_parser_reserve_on_preserve_on", flb_test_filter_parser_reserve_on_preserve_on}, + {"filter_parser_nest_under_on", flb_test_filter_parser_nest_under_on}, {NULL, NULL} };