diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index f934ac7d8e3..279f5f21718 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -150,6 +150,9 @@ struct flb_route { struct flb_input_routes { flb_sds_t input_name; + flb_sds_t plugin_name; + int has_alias; + struct flb_input_instance *instance; struct cfl_list processors; struct cfl_list routes; struct cfl_list _head; diff --git a/src/flb_router_config.c b/src/flb_router_config.c index 9c21a481161..f743e3ba986 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -388,6 +388,10 @@ static void input_routes_destroy(struct flb_input_routes *input) flb_sds_destroy(input->input_name); } + if (input->plugin_name) { + flb_sds_destroy(input->plugin_name); + } + flb_free(input); } @@ -1080,6 +1084,8 @@ static int parse_input_section(struct flb_cf_section *section, struct cfl_list *input_routes, struct flb_config *config) { + uint32_t mask; + size_t before_count; struct flb_input_routes *input; struct cfl_kvlist *kvlist; struct cfl_variant *name_var; @@ -1088,8 +1094,7 @@ static int parse_input_section(struct flb_cf_section *section, struct cfl_kvlist *routes_kvlist; struct cfl_list *head; struct cfl_kvpair *pair; - uint32_t mask; - size_t before_count; + struct cfl_variant *alias_var; if (!section || !input_routes) { return -1; @@ -1130,13 +1135,29 @@ static int parse_input_section(struct flb_cf_section *section, cfl_list_init(&input->_head); cfl_list_init(&input->processors); cfl_list_init(&input->routes); + input->has_alias = FLB_FALSE; + input->instance = NULL; - input->input_name = copy_from_cfl_sds(name_var->data.as_string); - if (!input->input_name) { + input->plugin_name = copy_from_cfl_sds(name_var->data.as_string); + if (!input->plugin_name) { flb_free(input); return -1; } + alias_var = cfl_kvlist_fetch(kvlist, "alias"); + if (alias_var && alias_var->type == CFL_VARIANT_STRING && + cfl_sds_len(alias_var->data.as_string) > 0) { + input->input_name = copy_from_cfl_sds(alias_var->data.as_string); + input->has_alias = FLB_TRUE; + } + else { + input->input_name = copy_from_cfl_sds(name_var->data.as_string); + } + if (!input->input_name) { + input_routes_destroy(input); + return -1; + } + processors_var = cfl_kvlist_fetch(kvlist, "processors"); if (processors_var) { if (parse_processors(processors_var, &input->processors, config) != 0) { @@ -1223,33 +1244,110 @@ int flb_router_config_parse(struct flb_cf *cf, } /* Apply parsed router configuration to actual input/output instances */ +static int input_instance_already_selected(struct flb_config *config, + struct flb_input_routes *current, + struct flb_input_instance *candidate) +{ + struct cfl_list *head; + struct flb_input_routes *routes; + + if (!config || !candidate) { + return FLB_FALSE; + } + + cfl_list_foreach(head, &config->input_routes) { + routes = cfl_list_entry(head, struct flb_input_routes, _head); + + if (routes == current) { + continue; + } + + if (routes->instance == candidate) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + static struct flb_input_instance *find_input_instance(struct flb_config *config, - flb_sds_t name) + struct flb_input_routes *routes) { struct mk_list *head; struct flb_input_instance *ins; + size_t key_len; - if (!config || !name) { + if (!config || !routes) { return NULL; } - mk_list_foreach(head, &config->inputs) { - ins = mk_list_entry(head, struct flb_input_instance, _head); + if (routes->instance) { + return routes->instance; + } - if (!ins->p) { - continue; + if (routes->has_alias && routes->input_name) { + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p || !ins->alias) { + continue; + } + + if (strcmp(ins->alias, routes->input_name) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } + } - if (ins->alias && strcmp(ins->alias, name) == 0) { - return ins; + if (routes->input_name) { + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p) { + continue; + } + + if (strcmp(ins->name, routes->input_name) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } + } - if (strcmp(ins->name, name) == 0) { - return ins; + if (routes->plugin_name) { + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p || !ins->p->name) { + continue; + } + + if (strcmp(ins->p->name, routes->plugin_name) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } + } - if (ins->p->name && strcmp(ins->p->name, name) == 0) { - return ins; + if (routes->input_name) { + key_len = flb_sds_len(routes->input_name); + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p || key_len == 0) { + continue; + } + + if (strncmp(ins->name, routes->input_name, key_len) == 0 && + input_instance_already_selected(config, routes, ins) == FLB_FALSE) { + routes->instance = ins; + return ins; + } } } @@ -1355,7 +1453,7 @@ int flb_router_apply_config(struct flb_config *config) cfl_list_foreach(input_head, &config->input_routes) { input_routes = cfl_list_entry(input_head, struct flb_input_routes, _head); - input_ins = find_input_instance(config, input_routes->input_name); + input_ins = find_input_instance(config, input_routes); if (!input_ins) { flb_warn("[router] could not find input instance '%s' for routes", input_routes->input_name ? input_routes->input_name : "(null)"); diff --git a/tests/internal/conditional_routing.c b/tests/internal/conditional_routing.c index 38c4c19cf2a..25e89b04a6d 100644 --- a/tests/internal/conditional_routing.c +++ b/tests/internal/conditional_routing.c @@ -883,6 +883,8 @@ static void setup_conditional_routes(struct flb_input_routes *input_routes, cfl_list_init(&input_routes->_head); cfl_list_init(&input_routes->routes); input_routes->input_name = flb_sds_create("tail"); + input_routes->plugin_name = flb_sds_create("tail"); + input_routes->has_alias = FLB_FALSE; /* Route 1: info_logs */ memset(route1, 0, sizeof(struct flb_route)); @@ -971,6 +973,7 @@ static void cleanup_conditional_routing_instances(struct flb_config *config, flb_sds_destroy(output2->alias); flb_sds_destroy(output3->alias); flb_sds_destroy(input_routes->input_name); + flb_sds_destroy(input_routes->plugin_name); flb_sds_destroy(route1->name); flb_sds_destroy(route2->name); flb_sds_destroy(route3->name); diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c index 8b2067c89b0..094eb9e2dd8 100644 --- a/tests/internal/router_config.c +++ b/tests/internal/router_config.c @@ -1163,6 +1163,8 @@ void test_router_apply_config_success() cfl_list_init(&input_routes._head); cfl_list_init(&input_routes.routes); input_routes.input_name = flb_sds_create("dummy"); + input_routes.plugin_name = flb_sds_create("dummy"); + input_routes.has_alias = FLB_FALSE; cfl_list_add(&input_routes._head, &config.input_routes); memset(&route, 0, sizeof(route)); @@ -1188,6 +1190,7 @@ void test_router_apply_config_success() flb_sds_destroy(input.alias); flb_sds_destroy(output.alias); flb_sds_destroy(input_routes.input_name); + flb_sds_destroy(input_routes.plugin_name); flb_sds_destroy(route.name); flb_sds_destroy(route_output.name); } @@ -1210,6 +1213,8 @@ void test_router_apply_config_missing_output() cfl_list_init(&input_routes._head); cfl_list_init(&input_routes.routes); input_routes.input_name = flb_sds_create("dummy"); + input_routes.plugin_name = flb_sds_create("dummy"); + input_routes.has_alias = FLB_FALSE; cfl_list_add(&input_routes._head, &config.input_routes); memset(&route, 0, sizeof(route)); @@ -1234,10 +1239,296 @@ void test_router_apply_config_missing_output() flb_sds_destroy(input.alias); flb_sds_destroy(output.alias); flb_sds_destroy(input_routes.input_name); + flb_sds_destroy(input_routes.plugin_name); flb_sds_destroy(route.name); flb_sds_destroy(route_output.name); } +void test_router_apply_config_uses_input_alias() +{ + struct flb_config config; + struct flb_input_instance input_one; + struct flb_input_instance input_two; + struct flb_output_instance output_one; + struct flb_output_instance output_two; + struct flb_input_routes routes_one; + struct flb_input_routes routes_two; + struct flb_route route_one; + struct flb_route route_two; + struct flb_route_output route_output_one; + struct flb_route_output route_output_two; + struct flb_input_plugin input_plugin; + struct flb_output_plugin output_plugin_one; + struct flb_output_plugin output_plugin_two; + struct flb_router_path *path; + + memset(&config, 0, sizeof(config)); + mk_list_init(&config.inputs); + mk_list_init(&config.outputs); + cfl_list_init(&config.input_routes); + + memset(&input_one, 0, sizeof(input_one)); + mk_list_init(&input_one._head); + cfl_list_init(&input_one.routes_direct); + cfl_list_init(&input_one.routes); + mk_list_init(&input_one.tasks); + mk_list_init(&input_one.chunks); + mk_list_init(&input_one.collectors); + snprintf(input_one.name, sizeof(input_one.name), "dummy.0"); + input_one.alias = flb_sds_create("input_one"); + input_one.p = &input_plugin; + input_one.config = &config; + mk_list_add(&input_one._head, &config.inputs); + + memset(&input_two, 0, sizeof(input_two)); + mk_list_init(&input_two._head); + cfl_list_init(&input_two.routes_direct); + cfl_list_init(&input_two.routes); + mk_list_init(&input_two.tasks); + mk_list_init(&input_two.chunks); + mk_list_init(&input_two.collectors); + snprintf(input_two.name, sizeof(input_two.name), "dummy.1"); + input_two.alias = flb_sds_create("input_two"); + input_two.p = &input_plugin; + input_two.config = &config; + mk_list_add(&input_two._head, &config.inputs); + + memset(&input_plugin, 0, sizeof(input_plugin)); + input_plugin.name = "dummy"; + + memset(&output_one, 0, sizeof(output_one)); + mk_list_init(&output_one._head); + mk_list_init(&output_one.properties); + mk_list_init(&output_one.net_properties); + snprintf(output_one.name, sizeof(output_one.name), "stdout.0"); + output_one.alias = flb_sds_create("print_one"); + output_one.event_type = FLB_OUTPUT_LOGS; + output_one.p = &output_plugin_one; + mk_list_add(&output_one._head, &config.outputs); + + memset(&output_two, 0, sizeof(output_two)); + mk_list_init(&output_two._head); + mk_list_init(&output_two.properties); + mk_list_init(&output_two.net_properties); + snprintf(output_two.name, sizeof(output_two.name), "stdout.1"); + output_two.alias = flb_sds_create("print_two"); + output_two.event_type = FLB_OUTPUT_LOGS; + output_two.p = &output_plugin_two; + mk_list_add(&output_two._head, &config.outputs); + + memset(&output_plugin_one, 0, sizeof(output_plugin_one)); + output_plugin_one.name = "stdout"; + memset(&output_plugin_two, 0, sizeof(output_plugin_two)); + output_plugin_two.name = "stdout"; + + memset(&routes_one, 0, sizeof(routes_one)); + cfl_list_init(&routes_one._head); + cfl_list_init(&routes_one.routes); + routes_one.input_name = flb_sds_create("input_one"); + routes_one.plugin_name = flb_sds_create("dummy"); + routes_one.has_alias = FLB_TRUE; + cfl_list_add(&routes_one._head, &config.input_routes); + + memset(&route_one, 0, sizeof(route_one)); + cfl_list_init(&route_one._head); + cfl_list_init(&route_one.outputs); + route_one.name = flb_sds_create("route_one"); + route_one.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_one._head, &routes_one.routes); + + memset(&route_output_one, 0, sizeof(route_output_one)); + cfl_list_init(&route_output_one._head); + route_output_one.name = flb_sds_create("print_one"); + cfl_list_add(&route_output_one._head, &route_one.outputs); + + memset(&routes_two, 0, sizeof(routes_two)); + cfl_list_init(&routes_two._head); + cfl_list_init(&routes_two.routes); + routes_two.input_name = flb_sds_create("input_two"); + routes_two.plugin_name = flb_sds_create("dummy"); + routes_two.has_alias = FLB_TRUE; + cfl_list_add(&routes_two._head, &config.input_routes); + + memset(&route_two, 0, sizeof(route_two)); + cfl_list_init(&route_two._head); + cfl_list_init(&route_two.outputs); + route_two.name = flb_sds_create("route_two"); + route_two.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_two._head, &routes_two.routes); + + memset(&route_output_two, 0, sizeof(route_output_two)); + cfl_list_init(&route_output_two._head); + route_output_two.name = flb_sds_create("print_two"); + cfl_list_add(&route_output_two._head, &route_two.outputs); + + TEST_CHECK(flb_router_apply_config(&config) == 0); + + TEST_CHECK(cfl_list_size(&input_one.routes_direct) == 1); + path = cfl_list_entry(input_one.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_one); + + TEST_CHECK(cfl_list_size(&input_two.routes_direct) == 1); + path = cfl_list_entry(input_two.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_two); + + flb_router_exit(&config); + + flb_sds_destroy(input_one.alias); + flb_sds_destroy(input_two.alias); + flb_sds_destroy(output_one.alias); + flb_sds_destroy(output_two.alias); + flb_sds_destroy(routes_one.input_name); + flb_sds_destroy(routes_one.plugin_name); + flb_sds_destroy(routes_two.input_name); + flb_sds_destroy(routes_two.plugin_name); + flb_sds_destroy(route_one.name); + flb_sds_destroy(route_two.name); + flb_sds_destroy(route_output_one.name); + flb_sds_destroy(route_output_two.name); +} + +void test_router_apply_config_distinct_instances_without_alias() +{ + struct flb_config config; + struct flb_input_instance input_one; + struct flb_input_instance input_two; + struct flb_output_instance output_one; + struct flb_output_instance output_two; + struct flb_input_routes routes_one; + struct flb_input_routes routes_two; + struct flb_route route_one; + struct flb_route route_two; + struct flb_route_output route_output_one; + struct flb_route_output route_output_two; + struct flb_input_plugin input_plugin; + struct flb_output_plugin output_plugin_one; + struct flb_output_plugin output_plugin_two; + struct flb_router_path *path; + + memset(&config, 0, sizeof(config)); + mk_list_init(&config.inputs); + mk_list_init(&config.outputs); + cfl_list_init(&config.input_routes); + + memset(&input_one, 0, sizeof(input_one)); + mk_list_init(&input_one._head); + cfl_list_init(&input_one.routes_direct); + cfl_list_init(&input_one.routes); + mk_list_init(&input_one.tasks); + mk_list_init(&input_one.chunks); + mk_list_init(&input_one.collectors); + snprintf(input_one.name, sizeof(input_one.name), "dummy.0"); + input_one.p = &input_plugin; + input_one.config = &config; + mk_list_add(&input_one._head, &config.inputs); + + memset(&input_two, 0, sizeof(input_two)); + mk_list_init(&input_two._head); + cfl_list_init(&input_two.routes_direct); + cfl_list_init(&input_two.routes); + mk_list_init(&input_two.tasks); + mk_list_init(&input_two.chunks); + mk_list_init(&input_two.collectors); + snprintf(input_two.name, sizeof(input_two.name), "dummy.1"); + input_two.p = &input_plugin; + input_two.config = &config; + mk_list_add(&input_two._head, &config.inputs); + + memset(&input_plugin, 0, sizeof(input_plugin)); + input_plugin.name = "dummy"; + + memset(&output_one, 0, sizeof(output_one)); + mk_list_init(&output_one._head); + mk_list_init(&output_one.properties); + mk_list_init(&output_one.net_properties); + snprintf(output_one.name, sizeof(output_one.name), "stdout.0"); + output_one.alias = flb_sds_create("print_one"); + output_one.event_type = FLB_OUTPUT_LOGS; + output_one.p = &output_plugin_one; + mk_list_add(&output_one._head, &config.outputs); + + memset(&output_two, 0, sizeof(output_two)); + mk_list_init(&output_two._head); + mk_list_init(&output_two.properties); + mk_list_init(&output_two.net_properties); + snprintf(output_two.name, sizeof(output_two.name), "stdout.1"); + output_two.alias = flb_sds_create("print_two"); + output_two.event_type = FLB_OUTPUT_LOGS; + output_two.p = &output_plugin_two; + mk_list_add(&output_two._head, &config.outputs); + + memset(&output_plugin_one, 0, sizeof(output_plugin_one)); + output_plugin_one.name = "stdout"; + memset(&output_plugin_two, 0, sizeof(output_plugin_two)); + output_plugin_two.name = "stdout"; + + memset(&routes_one, 0, sizeof(routes_one)); + cfl_list_init(&routes_one._head); + cfl_list_init(&routes_one.routes); + routes_one.input_name = flb_sds_create("dummy"); + routes_one.plugin_name = flb_sds_create("dummy"); + routes_one.has_alias = FLB_FALSE; + cfl_list_add(&routes_one._head, &config.input_routes); + + memset(&route_one, 0, sizeof(route_one)); + cfl_list_init(&route_one._head); + cfl_list_init(&route_one.outputs); + route_one.name = flb_sds_create("route_one"); + route_one.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_one._head, &routes_one.routes); + + memset(&route_output_one, 0, sizeof(route_output_one)); + cfl_list_init(&route_output_one._head); + route_output_one.name = flb_sds_create("print_one"); + cfl_list_add(&route_output_one._head, &route_one.outputs); + + memset(&routes_two, 0, sizeof(routes_two)); + cfl_list_init(&routes_two._head); + cfl_list_init(&routes_two.routes); + routes_two.input_name = flb_sds_create("dummy"); + routes_two.plugin_name = flb_sds_create("dummy"); + routes_two.has_alias = FLB_FALSE; + cfl_list_add(&routes_two._head, &config.input_routes); + + memset(&route_two, 0, sizeof(route_two)); + cfl_list_init(&route_two._head); + cfl_list_init(&route_two.outputs); + route_two.name = flb_sds_create("route_two"); + route_two.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route_two._head, &routes_two.routes); + + memset(&route_output_two, 0, sizeof(route_output_two)); + cfl_list_init(&route_output_two._head); + route_output_two.name = flb_sds_create("print_two"); + cfl_list_add(&route_output_two._head, &route_two.outputs); + + TEST_CHECK(flb_router_apply_config(&config) == 0); + + TEST_CHECK(cfl_list_size(&input_one.routes_direct) == 1); + path = cfl_list_entry(input_one.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_one); + + TEST_CHECK(cfl_list_size(&input_two.routes_direct) == 1); + path = cfl_list_entry(input_two.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output_two); + + TEST_CHECK(routes_one.instance == &input_one); + TEST_CHECK(routes_two.instance == &input_two); + + flb_router_exit(&config); + + flb_sds_destroy(output_one.alias); + flb_sds_destroy(output_two.alias); + flb_sds_destroy(routes_one.input_name); + flb_sds_destroy(routes_one.plugin_name); + flb_sds_destroy(routes_two.input_name); + flb_sds_destroy(routes_two.plugin_name); + flb_sds_destroy(route_one.name); + flb_sds_destroy(route_two.name); + flb_sds_destroy(route_output_one.name); + flb_sds_destroy(route_output_two.name); +} + void test_router_route_default_precedence() { struct cfl_list routes; @@ -2048,6 +2339,8 @@ TEST_LIST = { { "parse_contexts_file", test_router_config_parse_file_contexts }, { "apply_config_success", test_router_apply_config_success }, { "apply_config_missing_output", test_router_apply_config_missing_output }, + { "apply_config_uses_input_alias", test_router_apply_config_uses_input_alias }, + { "apply_config_distinct_instances_without_alias", test_router_apply_config_distinct_instances_without_alias }, { "route_default_precedence", test_router_route_default_precedence }, { "condition_eval_logs_metadata_context", test_router_condition_eval_logs_metadata_context }, { "condition_eval_logs_group_context", test_router_condition_eval_logs_group_context },