diff --git a/include/fluent-bit/config_format/flb_cf.h b/include/fluent-bit/config_format/flb_cf.h index b5ae20fa327..6c6233c28bc 100644 --- a/include/fluent-bit/config_format/flb_cf.h +++ b/include/fluent-bit/config_format/flb_cf.h @@ -174,6 +174,11 @@ struct cfl_variant *flb_cf_section_property_add(struct flb_cf *cf, char *k_buf, size_t k_len, char *v_buf, size_t v_len); +struct cfl_variant *flb_cf_section_property_add_variant(struct flb_cf *cf, + struct cfl_kvlist *kv_list, + char *k_buf, size_t k_len, + struct cfl_variant *variant); + struct cfl_array *flb_cf_section_property_add_list(struct flb_cf *cf, struct cfl_kvlist *kv_list, char *k_buf, size_t k_len); diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e1e8f2f0783..10fe72a48b8 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -325,6 +326,9 @@ struct flb_config { int json_escape_unicode; int dry_run; + + /* New Router Configuration */ + struct cfl_list input_routes; }; #define FLB_CONFIG_LOG_LEVEL(c) (c->log->level) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 81528248eb6..a8e4f2dddab 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -2,7 +2,7 @@ /* Fluent Bit * ========== - * Copyright (C) 2015-2024 The Fluent Bit Authors + * Copyright (C) 2015-2025 The Fluent Bit Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,11 @@ #include #include #include +#include +#include +#include +#include +#include struct flb_router_path { struct flb_output_instance *ins; @@ -56,6 +61,61 @@ static inline int flb_router_match_type(int in_event_type, return FLB_TRUE; } +enum flb_router_signal { + FLB_ROUTER_SIGNAL_LOGS = (1U << 0), + FLB_ROUTER_SIGNAL_METRICS = (1U << 1), + FLB_ROUTER_SIGNAL_TRACES = (1U << 2), + FLB_ROUTER_SIGNAL_ANY = (FLB_ROUTER_SIGNAL_LOGS | + FLB_ROUTER_SIGNAL_METRICS | + FLB_ROUTER_SIGNAL_TRACES) +}; + +struct flb_route_condition_rule { + flb_sds_t field; + flb_sds_t op; + flb_sds_t value; + struct cfl_list _head; +}; + +struct flb_route_condition { + struct cfl_list rules; + int is_default; +}; + +struct flb_route_output { + flb_sds_t name; + flb_sds_t fallback; + struct cfl_list _head; +}; + +struct flb_route_processor_property { + flb_sds_t key; + flb_sds_t value; + struct cfl_list _head; +}; + +struct flb_route_processor { + flb_sds_t name; + struct cfl_list properties; + struct cfl_list _head; +}; + +struct flb_route { + flb_sds_t name; + uint32_t signals; + struct flb_route_condition *condition; + struct cfl_list outputs; + struct cfl_list processors; + struct cfl_list _head; +}; + +struct flb_input_routes { + flb_sds_t input_name; + struct cfl_list processors; + struct cfl_list routes; + struct cfl_list _head; +}; + int flb_router_connect(struct flb_input_instance *in, struct flb_output_instance *out); int flb_router_connect_direct(struct flb_input_instance *in, @@ -65,4 +125,25 @@ int flb_router_match(const char *tag, int tag_len, const char *match, void *match_regex); int flb_router_io_set(struct flb_config *config); void flb_router_exit(struct flb_config *config); + +uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk); + +int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_route *route); +int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_route *route); +int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_route *route); +int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_route *route); + +struct flb_cf; + +int flb_router_config_parse(struct flb_cf *cf, + struct cfl_list *input_routes, + struct flb_config *config); +void flb_router_routes_destroy(struct cfl_list *input_routes); +int flb_router_apply_config(struct flb_config *config); + #endif + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 13bc3e4418f..c00a1bb13c5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -54,6 +54,8 @@ set(src flb_upstream_ha.c flb_upstream_node.c flb_router.c + flb_router_condition.c + flb_router_config.c flb_worker.c flb_coro.c flb_time.c diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 46174e7f242..e5e220cc60c 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -754,7 +754,9 @@ static enum status state_copy_into_properties(struct parser_state *state, struct struct cfl_kvpair *kvp; struct cfl_variant *var; struct cfl_array *arr; - int idx; + size_t idx; + size_t entry_count; + int array_all_strings; cfl_list_foreach(head, &state->keyvals->list) { kvp = cfl_list_entry(head, struct cfl_kvpair, _head); @@ -773,33 +775,57 @@ static enum status state_copy_into_properties(struct parser_state *state, struct } break; case CFL_VARIANT_ARRAY: - arr = flb_cf_section_property_add_list(conf, properties, - kvp->key, cfl_sds_len(kvp->key)); + entry_count = kvp->val->data.as_array->entry_count; + array_all_strings = 1; - if (arr == NULL) { - flb_error("unable to add property list"); - return YAML_FAILURE; - } - for (idx = 0; idx < kvp->val->data.as_array->entry_count; idx++) { + for (idx = 0; idx < entry_count; idx++) { var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx); + if (var == NULL || var->type != CFL_VARIANT_STRING) { + array_all_strings = 0; + break; + } + } + + if (array_all_strings == 1) { + arr = flb_cf_section_property_add_list(conf, properties, + kvp->key, cfl_sds_len(kvp->key)); - if (var == NULL) { - flb_error("unable to retrieve from array by index"); + if (arr == NULL) { + flb_error("unable to add property list"); return YAML_FAILURE; } - switch (var->type) { - case CFL_VARIANT_STRING: + + for (idx = 0; idx < entry_count; idx++) { + var = cfl_array_fetch_by_index(kvp->val->data.as_array, idx); if (cfl_array_append_string(arr, var->data.as_string) < 0) { flb_error("unable to append string to array"); return YAML_FAILURE; } - break; - default: - flb_error("unable to copy value for property"); + } + } + else { + if (flb_cf_section_property_add_variant(conf, + properties, + kvp->key, + cfl_sds_len(kvp->key), + kvp->val) == NULL) { + flb_error("unable to add variant property"); return YAML_FAILURE; } + kvp->val = NULL; + } + break; + case CFL_VARIANT_KVLIST: + if (flb_cf_section_property_add_variant(conf, + properties, + kvp->key, + cfl_sds_len(kvp->key), + kvp->val) == NULL) { + flb_error("unable to add variant property"); + return YAML_FAILURE; } + kvp->val = NULL; break; default: flb_error("unknown value type for properties: %d", kvp->val->type); @@ -1990,6 +2016,10 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, if (state->section == SECTION_PROCESSOR) { state = state_push_variant(ctx, state, 0); } + else if (strcmp(state->key, "routes") == 0 || + strcmp(state->key, "processors") == 0) { + state = state_push_variant(ctx, state, 0); + } else { state = state_push_witharr(ctx, state, STATE_PLUGIN_VAL_LIST); } @@ -2001,6 +2031,29 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; case YAML_MAPPING_START_EVENT: + if (strcmp(state->key, "processors") == 0) { + struct flb_cf_group *group; + + group = flb_cf_group_create(conf, state->cf_section, + state->key, + strlen(state->key)); + + if (group == NULL) { + flb_error("unable to create processors group"); + return YAML_FAILURE; + } + + state->cf_group = group; + state = state_push(ctx, STATE_INPUT_PROCESSORS); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + + break; + } + if (state->section == SECTION_PROCESSOR) { /* when in a processor section, we allow plugins to have nested * properties which are returned as a cfl_variant */ @@ -2013,17 +2066,14 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; } - if (strcmp(state->key, "processors") == 0) { - state = state_push(ctx, STATE_INPUT_PROCESSORS); + if (strcmp(state->key, "routes") == 0 || + strcmp(state->key, "processors") == 0) { + state = state_push_variant(ctx, state, 1); if (state == NULL) { flb_error("unable to allocate state"); return YAML_FAILURE; } - - if (state_create_group(conf, state, "processors") == YAML_FAILURE) { - return YAML_FAILURE; - } break; } diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index 28486d6a257..e6d55bf29c1 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -325,6 +325,44 @@ struct cfl_variant *flb_cf_section_property_add(struct flb_cf *cf, return NULL; } +struct cfl_variant *flb_cf_section_property_add_variant(struct flb_cf *cf, + struct cfl_kvlist *kv_list, + char *k_buf, size_t k_len, + struct cfl_variant *variant) +{ + int rc; + flb_sds_t key; + + if (variant == NULL) { + return NULL; + } + + if (k_len == 0) { + k_len = strlen(k_buf); + } + + key = flb_cf_key_translate(cf, k_buf, k_len); + if (key == NULL) { + return NULL; + } + + rc = flb_sds_trim(key); + if (rc == -1) { + flb_cf_error_set(cf, FLB_CF_ERROR_KV_INVALID_KEY); + flb_sds_destroy(key); + return NULL; + } + + rc = cfl_kvlist_insert(kv_list, key, variant); + if (rc < 0) { + flb_sds_destroy(key); + return NULL; + } + + flb_sds_destroy(key); + return variant; +} + struct cfl_array *flb_cf_section_property_add_list(struct flb_cf *cf, struct cfl_kvlist *kv_list, char *k_buf, size_t k_len) diff --git a/src/flb_config.c b/src/flb_config.c index ddfdd010a1c..268ce8ce9d0 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -44,6 +44,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -367,6 +368,7 @@ struct flb_config *flb_config_init() mk_list_init(&config->filters); mk_list_init(&config->outputs); mk_list_init(&config->proxies); + cfl_list_init(&config->input_routes); mk_list_init(&config->workers); mk_list_init(&config->upstreams); mk_list_init(&config->downstreams); @@ -613,6 +615,9 @@ void flb_config_exit(struct flb_config *config) flb_config_task_map_resize(config, 0); flb_routes_empty_mask_destroy(config); + /* Clean up router input routes */ + flb_router_routes_destroy(&config->input_routes); + flb_free(config); } @@ -856,6 +861,9 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, if (strcasecmp(kv->key, "name") == 0) { continue; } + if (strcasecmp(kv->key, "routes") == 0) { + continue; + } /* set ret to -1 to ensure that we treat any unhandled plugin or * value types as errors. @@ -960,6 +968,7 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) { int ret; + flb_debug("[config] starting configuration loading"); struct flb_kv *kv; struct mk_list *head; struct cfl_kvpair *ckv; @@ -1061,6 +1070,13 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) return -1; } + /* Parse new router configuration */ + ret = flb_router_config_parse(cf, &config->input_routes, config); + if (ret == -1) { + flb_debug("[router] router configuration parsing failed"); + return -1; + } + return 0; } diff --git a/src/flb_router.c b/src/flb_router.c index f6bf24774d6..9312f5ffed0 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -256,6 +256,15 @@ int flb_router_io_set(struct flb_config *config) } } + /* Apply new router configuration if available */ + if (!cfl_list_is_empty(&config->input_routes)) { + flb_debug("[router] new router configuration found, applying..."); + if (flb_router_apply_config(config) == -1) { + flb_error("[router] failed to apply new router configuration"); + return -1; + } + } + return 0; } diff --git a/src/flb_router_condition.c b/src/flb_router_condition.c new file mode 100644 index 00000000000..318227a094a --- /dev/null +++ b/src/flb_router_condition.c @@ -0,0 +1,123 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) +{ + if (!chunk) { + return 0; + } + + switch (chunk->type) { + case FLB_EVENT_TYPE_LOGS: + return FLB_ROUTER_SIGNAL_LOGS; + case FLB_EVENT_TYPE_METRICS: + return FLB_ROUTER_SIGNAL_METRICS; + case FLB_EVENT_TYPE_TRACES: + return FLB_ROUTER_SIGNAL_TRACES; + default: + break; + } + + return 0; +} + +int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + (void) chunk; + (void) route; + + /* + * The full condition evaluation engine requires field resolvers that map + * record accessors to the different telemetry payload shapes. The wiring + * of those resolvers is part of a bigger effort and will be implemented in + * follow-up changes. For the time being we simply report that the + * condition failed so that the runtime can rely on explicit default + * routes. + */ + return FLB_FALSE; +} + +int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + (void) chunk; + (void) route; + + return FLB_FALSE; +} + +int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + (void) chunk; + (void) route; + + return FLB_FALSE; +} + +int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + uint32_t signal; + + if (!route) { + return FLB_FALSE; + } + + if (!route->condition) { + return FLB_TRUE; + } + + signal = flb_router_signal_from_chunk(chunk); + if (signal == 0) { + return FLB_FALSE; + } + + if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && + ((route->signals & signal) == 0)) { + return FLB_FALSE; + } + + if (route->condition->is_default) { + return FLB_TRUE; + } + + if (cfl_list_is_empty(&route->condition->rules) == 0) { + return FLB_TRUE; + } + + switch (signal) { + case FLB_ROUTER_SIGNAL_LOGS: + return flb_condition_eval_logs(chunk, route); + case FLB_ROUTER_SIGNAL_METRICS: + return flb_condition_eval_metrics(chunk, route); + case FLB_ROUTER_SIGNAL_TRACES: + return flb_condition_eval_traces(chunk, route); + default: + break; + } + + return FLB_FALSE; +} + diff --git a/src/flb_router_config.c b/src/flb_router_config.c new file mode 100644 index 00000000000..0513a11afd2 --- /dev/null +++ b/src/flb_router_config.c @@ -0,0 +1,1244 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#ifndef _WIN32 +#include +#endif + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +static flb_sds_t copy_from_cfl_sds(cfl_sds_t value) +{ + if (!value) { + return NULL; + } + + return flb_sds_create_len(value, cfl_sds_len(value)); +} + +static flb_sds_t variant_to_sds(struct cfl_variant *var) +{ + char tmp[64]; + int len; + + if (!var) { + return NULL; + } + + switch (var->type) { + case CFL_VARIANT_STRING: + return copy_from_cfl_sds(var->data.as_string); + case CFL_VARIANT_INT: + len = snprintf(tmp, sizeof(tmp), "%" PRId64, var->data.as_int64); + if (len < 0) { + return NULL; + } + return flb_sds_create_len(tmp, len); + case CFL_VARIANT_UINT: + len = snprintf(tmp, sizeof(tmp), "%" PRIu64, var->data.as_uint64); + if (len < 0) { + return NULL; + } + return flb_sds_create_len(tmp, len); + case CFL_VARIANT_DOUBLE: + len = snprintf(tmp, sizeof(tmp), "%.*g", 17, var->data.as_double); + if (len < 0) { + return NULL; + } + return flb_sds_create_len(tmp, len); + case CFL_VARIANT_BOOL: + return flb_sds_create(var->data.as_bool ? "true" : "false"); + default: + break; + } + + return NULL; +} + +static int variant_to_bool(struct cfl_variant *var, int *out) +{ + const char *val; + + if (!var || !out) { + return -1; + } + + if (var->type == CFL_VARIANT_BOOL) { + *out = var->data.as_bool != 0; + return 0; + } + else if (var->type == CFL_VARIANT_STRING && var->data.as_string) { + val = var->data.as_string; + + if (strcasecmp(val, "true") == 0) { + *out = FLB_TRUE; + return 0; + } + else if (strcasecmp(val, "false") == 0) { + *out = FLB_FALSE; + return 0; + } + } + + return -1; +} + +static int field_allowed_for_logs(const char *field) +{ + if (!field) { + return FLB_FALSE; + } + + if (strncmp(field, "$metric.", 8) == 0) { + return FLB_FALSE; + } + if (strncmp(field, "$span.", 6) == 0) { + return FLB_FALSE; + } + if (strncmp(field, "$scope.", 7) == 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int field_allowed_for_metrics(const char *field) +{ + if (!field) { + return FLB_FALSE; + } + + if (strncmp(field, "$metric.", 8) == 0) { + return FLB_TRUE; + } + if (strncmp(field, "$resource[", 10) == 0) { + return FLB_TRUE; + } + if (strncmp(field, "$attributes[", 12) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int field_allowed_for_traces(const char *field) +{ + if (!field) { + return FLB_FALSE; + } + + if (strncmp(field, "$span.", 6) == 0) { + return FLB_TRUE; + } + if (strncmp(field, "$resource[", 10) == 0) { + return FLB_TRUE; + } + if (strncmp(field, "$scope[", 7) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int validate_rule_field(const char *field, uint32_t signals) +{ + int ok = FLB_FALSE; + + if (!field) { + return FLB_FALSE; + } + + if (signals == FLB_ROUTER_SIGNAL_ANY) { + signals = FLB_ROUTER_SIGNAL_LOGS | + FLB_ROUTER_SIGNAL_METRICS | + FLB_ROUTER_SIGNAL_TRACES; + } + + if ((signals & FLB_ROUTER_SIGNAL_LOGS) && field_allowed_for_logs(field)) { + ok = FLB_TRUE; + } + + if ((signals & FLB_ROUTER_SIGNAL_METRICS) && + field_allowed_for_metrics(field)) { + ok = FLB_TRUE; + } + + if ((signals & FLB_ROUTER_SIGNAL_TRACES) && + field_allowed_for_traces(field)) { + ok = FLB_TRUE; + } + + return ok; +} + +static uint32_t parse_signal_key(const char *key) +{ + const char *cursor; + const char *start; + size_t len; + uint32_t mask = 0; + + if (!key) { + return 0; + } + + cursor = key; + while (*cursor) { + while (*cursor && (isspace((unsigned char) *cursor) || + *cursor == ',' || *cursor == '|' || *cursor == '+')) { + cursor++; + } + + if (*cursor == '\0') { + break; + } + + start = cursor; + while (*cursor && !isspace((unsigned char) *cursor) && + *cursor != ',' && *cursor != '|' && *cursor != '+') { + cursor++; + } + + len = cursor - start; + if (len == 0) { + continue; + } + + if (len == 4 && strncasecmp(start, "logs", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_LOGS; + } + else if (len == 7 && strncasecmp(start, "metrics", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_METRICS; + } + else if (len == 6 && strncasecmp(start, "traces", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_TRACES; + } + else if (len == 3 && strncasecmp(start, "any", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_ANY; + } + else { + return 0; + } + } + + return mask; +} + +static void route_condition_destroy(struct flb_route_condition *condition) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct flb_route_condition_rule *rule; + + if (!condition) { + return; + } + + cfl_list_foreach_safe(head, tmp, &condition->rules) { + rule = cfl_list_entry(head, struct flb_route_condition_rule, _head); + cfl_list_del(&rule->_head); + + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + + flb_free(rule); + } + + flb_free(condition); +} + +static void route_outputs_destroy(struct flb_route *route) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct flb_route_output *output; + + cfl_list_foreach_safe(head, tmp, &route->outputs) { + output = cfl_list_entry(head, struct flb_route_output, _head); + cfl_list_del(&output->_head); + + if (output->name) { + flb_sds_destroy(output->name); + } + if (output->fallback) { + flb_sds_destroy(output->fallback); + } + flb_free(output); + } +} + +static void route_processors_destroy(struct cfl_list *processors) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct cfl_list *p_head; + struct cfl_list *p_tmp; + struct flb_route_processor *processor; + struct flb_route_processor_property *prop; + + if (!processors) { + return; + } + + cfl_list_foreach_safe(head, tmp, processors) { + processor = cfl_list_entry(head, struct flb_route_processor, _head); + cfl_list_del(&processor->_head); + + cfl_list_foreach_safe(p_head, p_tmp, &processor->properties) { + prop = cfl_list_entry(p_head, struct flb_route_processor_property, _head); + cfl_list_del(&prop->_head); + + if (prop->key) { + flb_sds_destroy(prop->key); + } + if (prop->value) { + flb_sds_destroy(prop->value); + } + flb_free(prop); + } + + if (processor->name) { + flb_sds_destroy(processor->name); + } + flb_free(processor); + } +} + +static void input_routes_destroy(struct flb_input_routes *input) +{ + struct cfl_list *r_head; + struct cfl_list *r_tmp; + struct flb_route *route; + + if (!input) { + return; + } + + route_processors_destroy(&input->processors); + + cfl_list_foreach_safe(r_head, r_tmp, &input->routes) { + route = cfl_list_entry(r_head, struct flb_route, _head); + cfl_list_del(&route->_head); + + if (route->condition) { + route_condition_destroy(route->condition); + } + + route_outputs_destroy(route); + route_processors_destroy(&route->processors); + + if (route->name) { + flb_sds_destroy(route->name); + } + flb_free(route); + } + + if (input->input_name) { + flb_sds_destroy(input->input_name); + } + + flb_free(input); +} + +void flb_router_routes_destroy(struct cfl_list *input_routes) +{ + struct cfl_list *head; + struct cfl_list *tmp; + struct flb_input_routes *routes; + + if (!input_routes) { + return; + } + + cfl_list_foreach_safe(head, tmp, input_routes) { + routes = cfl_list_entry(head, struct flb_input_routes, _head); + cfl_list_del(&routes->_head); + input_routes_destroy(routes); + } +} + +static int add_processor_properties(struct flb_route_processor *processor, + struct cfl_kvlist *kvlist) +{ + struct cfl_list *head; + struct cfl_kvpair *pair; + struct flb_route_processor_property *prop; + + if (!processor || !kvlist) { + return -1; + } + + cfl_list_foreach(head, &kvlist->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (strcmp(pair->key, "name") == 0) { + continue; + } + + prop = flb_calloc(1, sizeof(struct flb_route_processor_property)); + if (!prop) { + flb_errno(); + return -1; + } + + cfl_list_init(&prop->_head); + + prop->key = flb_sds_create_len(pair->key, cfl_sds_len(pair->key)); + if (!prop->key) { + flb_free(prop); + return -1; + } + + prop->value = variant_to_sds(pair->val); + if (!prop->value) { + flb_sds_destroy(prop->key); + flb_free(prop); + return -1; + } + + cfl_list_add(&prop->_head, &processor->properties); + } + + return 0; +} + +static int parse_processors(struct cfl_variant *variant, + struct cfl_list *out_list, + struct flb_config *config) +{ + size_t idx; + struct cfl_array *array; + struct cfl_variant *entry; + struct cfl_kvlist *kvlist; + struct cfl_variant *name_var; + struct flb_route_processor *processor; + + (void) config; + + if (!variant || !out_list) { + return -1; + } + + if (variant->type != CFL_VARIANT_ARRAY) { + return -1; + } + + array = variant->data.as_array; + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + if (!entry || entry->type != CFL_VARIANT_KVLIST) { + return -1; + } + + kvlist = entry->data.as_kvlist; + name_var = cfl_kvlist_fetch(kvlist, "name"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + return -1; + } + + processor = flb_calloc(1, sizeof(struct flb_route_processor)); + if (!processor) { + flb_errno(); + return -1; + } + cfl_list_init(&processor->_head); + cfl_list_init(&processor->properties); + + processor->name = copy_from_cfl_sds(name_var->data.as_string); + if (!processor->name) { + flb_free(processor); + return -1; + } + + if (add_processor_properties(processor, kvlist) != 0) { + route_processors_destroy(&processor->properties); + if (processor->name) { + flb_sds_destroy(processor->name); + } + flb_free(processor); + return -1; + } + + cfl_list_add(&processor->_head, out_list); + } + + return 0; +} + +static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant *variant) +{ + struct flb_route_condition_rule *rule; + struct cfl_kvlist *kvlist; + struct cfl_variant *field_var; + struct cfl_variant *op_var; + struct cfl_variant *value_var; + + if (!variant || variant->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + kvlist = variant->data.as_kvlist; + field_var = cfl_kvlist_fetch(kvlist, "field"); + op_var = cfl_kvlist_fetch(kvlist, "op"); + value_var = cfl_kvlist_fetch(kvlist, "value"); + + if (!field_var || field_var->type != CFL_VARIANT_STRING) { + return NULL; + } + if (!op_var || op_var->type != CFL_VARIANT_STRING) { + return NULL; + } + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + if (!rule) { + flb_errno(); + return NULL; + } + cfl_list_init(&rule->_head); + + rule->field = copy_from_cfl_sds(field_var->data.as_string); + if (!rule->field) { + flb_free(rule); + return NULL; + } + + rule->op = copy_from_cfl_sds(op_var->data.as_string); + if (!rule->op) { + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + if (value_var) { + rule->value = variant_to_sds(value_var); + if (!rule->value && strcmp(rule->op, "exists") != 0) { + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + } + + return rule; +} + +static struct flb_route_condition *parse_condition(struct cfl_variant *variant, + uint32_t signals) +{ + struct flb_route_condition *condition; + struct cfl_variant *rules_var; + struct cfl_variant *default_var; + struct cfl_array *rules_array; + struct cfl_variant *entry; + struct flb_route_condition_rule *rule; + int val; + size_t idx; + + if (!variant || variant->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + if (!condition) { + flb_errno(); + return NULL; + } + cfl_list_init(&condition->rules); + + rules_var = cfl_kvlist_fetch(variant->data.as_kvlist, "rules"); + default_var = cfl_kvlist_fetch(variant->data.as_kvlist, "default"); + + if (default_var) { + if (variant_to_bool(default_var, &val) != 0) { + route_condition_destroy(condition); + return NULL; + } + condition->is_default = val; + } + + if (rules_var) { + if (rules_var->type != CFL_VARIANT_ARRAY) { + route_condition_destroy(condition); + return NULL; + } + + rules_array = rules_var->data.as_array; + for (idx = 0; idx < cfl_array_size(rules_array); idx++) { + entry = cfl_array_fetch_by_index(rules_array, idx); + rule = parse_condition_rule(entry); + if (!rule) { + route_condition_destroy(condition); + return NULL; + } + + if (!validate_rule_field(rule->field, signals)) { + route_condition_destroy(condition); + return NULL; + } + + cfl_list_add(&rule->_head, &condition->rules); + } + } + + if (!condition->is_default && cfl_list_is_empty(&condition->rules) == 1) { + route_condition_destroy(condition); + return NULL; + } + + return condition; +} + +static int add_output_from_variant(struct flb_route *route, + struct cfl_variant *variant) +{ + struct flb_route_output *output; + struct cfl_variant *name_var; + struct cfl_variant *fallback_var; + + if (!route || !variant) { + return -1; + } + + output = flb_calloc(1, sizeof(struct flb_route_output)); + if (!output) { + flb_errno(); + return -1; + } + cfl_list_init(&output->_head); + + if (variant->type == CFL_VARIANT_STRING) { + output->name = copy_from_cfl_sds(variant->data.as_string); + if (!output->name) { + flb_free(output); + return -1; + } + } + else if (variant->type == CFL_VARIANT_KVLIST) { + name_var = cfl_kvlist_fetch(variant->data.as_kvlist, "name"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + flb_free(output); + return -1; + } + output->name = copy_from_cfl_sds(name_var->data.as_string); + if (!output->name) { + flb_free(output); + return -1; + } + + fallback_var = cfl_kvlist_fetch(variant->data.as_kvlist, "fallback"); + if (fallback_var) { + if (fallback_var->type != CFL_VARIANT_STRING) { + flb_sds_destroy(output->name); + flb_free(output); + return -1; + } + output->fallback = copy_from_cfl_sds(fallback_var->data.as_string); + if (!output->fallback) { + flb_sds_destroy(output->name); + flb_free(output); + return -1; + } + } + } + else { + flb_free(output); + return -1; + } + + cfl_list_add(&output->_head, &route->outputs); + return 0; +} + +static int parse_outputs(struct cfl_variant *variant, struct flb_route *route) +{ + size_t idx; + struct cfl_array *array; + struct cfl_variant *entry; + + if (!variant || !route) { + return -1; + } + + if (variant->type == CFL_VARIANT_ARRAY) { + array = variant->data.as_array; + + if (cfl_array_size(array) == 0) { + return -1; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + if (!entry) { + return -1; + } + + if (add_output_from_variant(route, entry) != 0) { + return -1; + } + } + + return 0; + } + + if (add_output_from_variant(route, variant) != 0) { + return -1; + } + + return 0; +} + +static int route_name_exists(struct cfl_list *routes, flb_sds_t name) +{ + struct cfl_list *head; + struct flb_route *route; + + if (!routes || !name) { + return FLB_FALSE; + } + + cfl_list_foreach(head, routes) { + route = cfl_list_entry(head, struct flb_route, _head); + if (route->name && strcmp(route->name, name) == 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int parse_route(struct cfl_variant *variant, + struct flb_input_routes *input, + struct flb_config *config, + uint32_t signals) +{ + struct flb_route *route; + struct cfl_kvlist *kvlist; + struct cfl_variant *name_var; + struct cfl_variant *condition_var; + struct cfl_variant *processors_var; + struct cfl_variant *to_var; + struct cfl_variant *outputs_var; + + (void) config; + + if (!variant || variant->type != CFL_VARIANT_KVLIST) { + return -1; + } + + if (signals == 0) { + return -1; + } + + kvlist = variant->data.as_kvlist; + + name_var = cfl_kvlist_fetch(kvlist, "name"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + return -1; + } + + route = flb_calloc(1, sizeof(struct flb_route)); + if (!route) { + flb_errno(); + return -1; + } + cfl_list_init(&route->_head); + cfl_list_init(&route->outputs); + cfl_list_init(&route->processors); + route->signals = signals; + + route->name = copy_from_cfl_sds(name_var->data.as_string); + if (!route->name) { + flb_free(route); + return -1; + } + + if (route_name_exists(&input->routes, route->name)) { + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + + + condition_var = cfl_kvlist_fetch(kvlist, "condition"); + if (condition_var) { + route->condition = parse_condition(condition_var, route->signals); + if (!route->condition) { + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + } + + processors_var = cfl_kvlist_fetch(kvlist, "processors"); + if (processors_var) { + if (parse_processors(processors_var, &route->processors, config) != 0) { + if (route->condition) { + route_condition_destroy(route->condition); + } + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + } + + to_var = cfl_kvlist_fetch(kvlist, "to"); + if (!to_var) { + if (route->condition) { + route_condition_destroy(route->condition); + } + route_processors_destroy(&route->processors); + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + + if (to_var->type == CFL_VARIANT_KVLIST) { + outputs_var = cfl_kvlist_fetch(to_var->data.as_kvlist, "outputs"); + } + else { + outputs_var = to_var; + } + + if (!outputs_var || parse_outputs(outputs_var, route) != 0 || + cfl_list_is_empty(&route->outputs) == 1) { + if (route->condition) { + route_condition_destroy(route->condition); + } + route_processors_destroy(&route->processors); + route_outputs_destroy(route); + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + + cfl_list_add(&route->_head, &input->routes); + return 0; +} + +static int parse_routes_block(struct cfl_variant *variant, + struct flb_input_routes *input, + struct flb_config *config, + uint32_t signals) +{ + struct cfl_array *array; + struct cfl_variant *entry; + size_t idx; + + if (!variant) { + return -1; + } + + if (variant->type == CFL_VARIANT_ARRAY) { + array = variant->data.as_array; + if (cfl_array_size(array) == 0) { + return -1; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + if (!entry) { + return -1; + } + + if (parse_route(entry, input, config, signals) != 0) { + return -1; + } + } + + return 0; + } + + if (variant->type == CFL_VARIANT_KVLIST) { + if (parse_route(variant, input, config, signals) != 0) { + return -1; + } + return 0; + } + + return -1; +} + +static int parse_input_section(struct flb_cf_section *section, + struct cfl_list *input_routes, + struct flb_config *config) +{ + struct flb_input_routes *input; + struct cfl_kvlist *kvlist; + struct cfl_variant *name_var; + struct cfl_variant *processors_var; + struct cfl_variant *routes_var; + struct cfl_kvlist *routes_kvlist; + struct cfl_list *head; + struct cfl_kvpair *pair; + uint32_t mask; + size_t before_count; + + if (!section || !input_routes) { + return -1; + } + + kvlist = section->properties; + if (!kvlist) { + return -1; + } + + routes_var = cfl_kvlist_fetch(kvlist, "routes"); + if (!routes_var) { + /* No router configuration for this input section */ + return 0; + } + + if (routes_var->type != CFL_VARIANT_KVLIST) { + return -1; + } + + routes_kvlist = routes_var->data.as_kvlist; + if (cfl_list_is_empty(&routes_kvlist->list) == 1) { + /* routes field present but empty, nothing to configure */ + return 0; + } + + name_var = cfl_kvlist_fetch(kvlist, "name"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + return -1; + } + + input = flb_calloc(1, sizeof(struct flb_input_routes)); + if (!input) { + flb_errno(); + return -1; + } + + cfl_list_init(&input->_head); + cfl_list_init(&input->processors); + cfl_list_init(&input->routes); + + input->input_name = copy_from_cfl_sds(name_var->data.as_string); + if (!input->input_name) { + flb_free(input); + return -1; + } + + processors_var = cfl_kvlist_fetch(kvlist, "processors"); + if (processors_var) { + if (parse_processors(processors_var, &input->processors, config) != 0) { + goto error; + } + } + + cfl_list_foreach(head, &routes_kvlist->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (!pair || !pair->key) { + goto error; + } + + mask = parse_signal_key(pair->key); + if (mask == 0) { + goto error; + } + + if (!pair->val) { + goto error; + } + + before_count = cfl_list_size(&input->routes); + if (parse_routes_block(pair->val, input, config, mask) != 0 || + cfl_list_size(&input->routes) == before_count) { + goto error; + } + } + + if (cfl_list_is_empty(&input->routes) == 1) { + goto skip; + } + + cfl_list_add(&input->_head, input_routes); + return 1; + +error: + input_routes_destroy(input); + return -1; + +skip: + input_routes_destroy(input); + return 0; +} + +int flb_router_config_parse(struct flb_cf *cf, + struct cfl_list *input_routes, + struct flb_config *config) +{ + struct mk_list *head; + struct flb_cf_section *section; + int routes_found = FLB_FALSE; + int ret; + + if (!cf || !input_routes) { + return -1; + } + + cfl_list_init(input_routes); + + mk_list_foreach(head, &cf->inputs) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + ret = parse_input_section(section, input_routes, config); + if (ret == -1) { + flb_router_routes_destroy(input_routes); + cfl_list_init(input_routes); + return -1; + } + else if (ret == 1) { + routes_found = FLB_TRUE; + } + } + + if (cfl_list_is_empty(input_routes) == 1) { + if (routes_found == FLB_TRUE) { + flb_router_routes_destroy(input_routes); + cfl_list_init(input_routes); + return -1; + } + return 0; + } + + return 0; +} + +/* Apply parsed router configuration to actual input/output instances */ +static struct flb_input_instance *find_input_instance(struct flb_config *config, + flb_sds_t name) +{ + struct mk_list *head; + struct flb_input_instance *ins; + + if (!config || !name) { + return NULL; + } + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + + if (!ins->p) { + continue; + } + + if (ins->alias && strcmp(ins->alias, name) == 0) { + return ins; + } + + if (strcmp(ins->name, name) == 0) { + return ins; + } + + if (ins->p->name && strcmp(ins->p->name, name) == 0) { + return ins; + } + } + + return NULL; +} + +static struct flb_output_instance *find_output_instance(struct flb_config *config, + flb_sds_t name) +{ + struct mk_list *head; + struct flb_output_instance *ins; + + if (!config || !name) { + return NULL; + } + + mk_list_foreach(head, &config->outputs) { + ins = mk_list_entry(head, struct flb_output_instance, _head); + + if (!ins->p) { + continue; + } + + if (ins->alias && strcmp(ins->alias, name) == 0) { + return ins; + } + + if (strcmp(ins->name, name) == 0) { + return ins; + } + + if (ins->p->name && strcmp(ins->p->name, name) == 0) { + return ins; + } + } + + return NULL; +} + +static int input_has_direct_route(struct flb_input_instance *in, + struct flb_output_instance *out) +{ + struct mk_list *head; + struct flb_router_path *path; + + if (!in || !out) { + return FLB_FALSE; + } + + mk_list_foreach(head, &in->routes_direct) { + path = mk_list_entry(head, struct flb_router_path, _head); + if (path->ins == out) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int output_supports_signals(struct flb_output_instance *out, uint32_t signals) +{ + if (!out) { + return FLB_FALSE; + } + + /* Handle ANY signal - if ANY is present in the bitmask, allow all signals */ + if (signals == 0 || (signals & FLB_ROUTER_SIGNAL_ANY)) { + return FLB_TRUE; + } + + /* Require support for all requested signal bits */ + if ((signals & FLB_ROUTER_SIGNAL_LOGS) && !(out->event_type & FLB_OUTPUT_LOGS)) { + return FLB_FALSE; + } + if ((signals & FLB_ROUTER_SIGNAL_METRICS) && !(out->event_type & FLB_OUTPUT_METRICS)) { + return FLB_FALSE; + } + if ((signals & FLB_ROUTER_SIGNAL_TRACES) && !(out->event_type & FLB_OUTPUT_TRACES)) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +int flb_router_apply_config(struct flb_config *config) +{ + struct cfl_list *input_head; + struct cfl_list *route_head; + struct cfl_list *output_head; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *route_output; + struct flb_input_instance *input_ins; + struct flb_output_instance *output_ins; + struct flb_output_instance *fallback_ins; + int created; + + if (!config) { + return 0; + } + + flb_debug("[router] applying router configuration"); + created = 0; + + cfl_list_foreach(input_head, &config->input_routes) { + input_routes = cfl_list_entry(input_head, struct flb_input_routes, _head); + + input_ins = find_input_instance(config, input_routes->input_name); + if (!input_ins) { + flb_warn("[router] could not find input instance '%s' for routes", + input_routes->input_name ? input_routes->input_name : "(null)"); + continue; + } + + cfl_list_foreach(route_head, &input_routes->routes) { + route = cfl_list_entry(route_head, struct flb_route, _head); + + cfl_list_foreach(output_head, &route->outputs) { + route_output = cfl_list_entry(output_head, struct flb_route_output, _head); + + output_ins = find_output_instance(config, route_output->name); + fallback_ins = NULL; + + if (!output_ins && route_output->fallback) { + fallback_ins = find_output_instance(config, route_output->fallback); + if (fallback_ins) { + flb_warn("[router] output '%s' not found, using fallback '%s'", + route_output->name, route_output->fallback); + output_ins = fallback_ins; + } + } + + if (!output_ins) { + flb_warn("[router] could not find output instance '%s' for route '%s/%s'", + route_output->name ? route_output->name : "(null)", + input_routes->input_name ? input_routes->input_name : "(null)", + route->name ? route->name : "(unnamed)"); + continue; + } + + if (!output_supports_signals(output_ins, route->signals)) { + flb_warn("[router] output '%s' does not support required signals for route '%s/%s'", + flb_output_name(output_ins), + input_routes->input_name ? input_routes->input_name : "(null)", + route->name ? route->name : "(unnamed)"); + continue; + } + + if (input_has_direct_route(input_ins, output_ins)) { + continue; + } + + if (flb_router_connect_direct(input_ins, output_ins) == 0) { + created++; + flb_debug("[router] connected input '%s' route '%s' to output '%s'", + flb_input_name(input_ins), + route->name ? route->name : "(unnamed)", + flb_output_name(output_ins)); + } + else { + flb_error("[router] failed to connect input '%s' to output '%s'", + flb_input_name(input_ins), + flb_output_name(output_ins)); + } + } + } + } + + if (created == 0) { + flb_debug("[router] no direct routes were connected from configuration"); + } + + return 0; +} + diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index a270f1dc3c8..0c8e984ed8f 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -13,6 +13,7 @@ set(UNIT_TESTS_FILES kv.c slist.c router.c + router_config.c network.c unit_sizes.c hashtable.c diff --git a/tests/internal/data/config_format/yaml/routing/basic.yaml b/tests/internal/data/config_format/yaml/routing/basic.yaml new file mode 100644 index 00000000000..a7597565e2e --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/basic.yaml @@ -0,0 +1,38 @@ +--- +pipeline: + inputs: + - name: opentelemetry + processors: + - name: parser + parser: json + routes: + logs: + - name: error_logs + condition: + rules: + - field: "$level" + op: eq + value: "error" + to: + outputs: + - name: loki + fallback: s3_backup + - name: default + condition: + default: true + to: + outputs: + - name: elasticsearch + metrics: + - name: metrics_above_threshold + condition: + rules: + - field: "$metric.name" + op: regex + value: "^cpu_" + - field: "$metric.value" + op: gt + value: 0.9 + to: + outputs: + - name: prometheus_remote diff --git a/tests/internal/data/config_format/yaml/routing/metrics.yaml b/tests/internal/data/config_format/yaml/routing/metrics.yaml new file mode 100644 index 00000000000..7aaa6e2030d --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/metrics.yaml @@ -0,0 +1,19 @@ +--- +pipeline: + inputs: + - name: metrics + routes: + metrics: + - name: cpu_hot + condition: + rules: + - field: "$metric.name" + op: eq + value: "cpu_usage" + - field: "$resource['service.name']" + op: eq + value: "checkout" + to: + outputs: + - name: prometheus_remote + fallback: s3_backup diff --git a/tests/internal/data/config_format/yaml/routing/multi_signal.yaml b/tests/internal/data/config_format/yaml/routing/multi_signal.yaml new file mode 100644 index 00000000000..46553e9e6e2 --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/multi_signal.yaml @@ -0,0 +1,27 @@ +--- +pipeline: + inputs: + - name: telemetry + routes: + "logs,traces": + - name: service_checkout + condition: + rules: + - field: "$resource['service.name']" + op: eq + value: "checkout" + - field: "$span.duration_ms" + op: gt + value: 2000 + to: + outputs: + - name: tempo + logs: + - name: catch_all + condition: + default: true + to: + outputs: + - name: loki + - name: s3_archive + fallback: glacier diff --git a/tests/internal/data/config_format/yaml/routing/precedence.yaml b/tests/internal/data/config_format/yaml/routing/precedence.yaml new file mode 100644 index 00000000000..b8df272fcc8 --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/precedence.yaml @@ -0,0 +1,15 @@ +--- +pipeline: + inputs: + - name: lib + routes: + logs: + - name: default_route + condition: + default: true + to: + outputs: + - name: lib_route + outputs: + - name: lib_route + type: lib diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c new file mode 100644 index 00000000000..c479c978021 --- /dev/null +++ b/tests/internal/router_config.c @@ -0,0 +1,863 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "flb_tests_internal.h" + +#ifdef _WIN32 +#define FLB_ROUTER_TEST_FILE(name) \ + FLB_TESTS_DATA_PATH "\\data\\config_format\\yaml\\routing\\" name +#else +#define FLB_ROUTER_TEST_FILE(name) \ + FLB_TESTS_DATA_PATH "/data/config_format/yaml/routing/" name +#endif + +static struct cfl_variant *clone_variant(struct cfl_variant *var); + +static struct cfl_array *clone_array(struct cfl_array *array) +{ + struct cfl_array *copy; + struct cfl_variant *entry; + struct cfl_variant *entry_copy; + size_t idx; + + if (!array) { + return NULL; + } + + copy = cfl_array_create(cfl_array_size(array)); + if (!copy) { + return NULL; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + entry_copy = clone_variant(entry); + if (!entry_copy) { + cfl_array_destroy(copy); + return NULL; + } + + if (cfl_array_append(copy, entry_copy) != 0) { + cfl_variant_destroy(entry_copy); + cfl_array_destroy(copy); + return NULL; + } + } + + return copy; +} + +static struct cfl_kvlist *clone_kvlist(struct cfl_kvlist *kvlist) +{ + struct cfl_kvlist *copy; + struct cfl_list *head; + struct cfl_kvpair *pair; + struct cfl_variant *value_copy; + + if (!kvlist) { + return NULL; + } + + copy = cfl_kvlist_create(); + if (!copy) { + return NULL; + } + + cfl_list_foreach(head, &kvlist->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + value_copy = clone_variant(pair->val); + if (!value_copy) { + cfl_kvlist_destroy(copy); + return NULL; + } + + if (cfl_kvlist_insert_s(copy, + pair->key, + cfl_sds_len(pair->key), + value_copy) != 0) { + cfl_variant_destroy(value_copy); + cfl_kvlist_destroy(copy); + return NULL; + } + } + + return copy; +} + +static struct cfl_variant *clone_variant(struct cfl_variant *var) +{ + struct cfl_array *array_copy; + struct cfl_kvlist *kvlist_copy; + int referenced; + + array_copy = NULL; + kvlist_copy = NULL; + referenced = CFL_FALSE; + + if (!var) { + return NULL; + } + + switch (var->type) { + case CFL_VARIANT_STRING: + referenced = (var->referenced == CFL_TRUE) ? CFL_TRUE : CFL_FALSE; + return cfl_variant_create_from_string_s(var->data.as_string, + cfl_sds_len(var->data.as_string), + referenced); + case CFL_VARIANT_BYTES: + referenced = (var->referenced == CFL_TRUE) ? CFL_TRUE : CFL_FALSE; + return cfl_variant_create_from_bytes(var->data.as_bytes, + cfl_variant_size_get(var), + referenced); + case CFL_VARIANT_BOOL: + return cfl_variant_create_from_bool(var->data.as_bool); + case CFL_VARIANT_INT: + return cfl_variant_create_from_int64(var->data.as_int64); + case CFL_VARIANT_UINT: + return cfl_variant_create_from_uint64(var->data.as_uint64); + case CFL_VARIANT_DOUBLE: + return cfl_variant_create_from_double(var->data.as_double); + case CFL_VARIANT_NULL: + return cfl_variant_create_from_null(); + case CFL_VARIANT_ARRAY: + array_copy = clone_array(var->data.as_array); + if (!array_copy) { + return NULL; + } + return cfl_variant_create_from_array(array_copy); + case CFL_VARIANT_KVLIST: + kvlist_copy = clone_kvlist(var->data.as_kvlist); + if (!kvlist_copy) { + return NULL; + } + return cfl_variant_create_from_kvlist(kvlist_copy); + case CFL_VARIANT_REFERENCE: + return cfl_variant_create_from_reference(var->data.as_reference); + default: + break; + } + + return NULL; +} + +static struct flb_cf *cf_from_inputs_variant(struct cfl_variant *inputs) +{ + struct flb_cf *cf; + struct cfl_array *array; + size_t idx; + + if (!inputs || inputs->type != CFL_VARIANT_ARRAY) { + return NULL; + } + + cf = flb_cf_create(); + if (!cf) { + return NULL; + } + + array = inputs->data.as_array; + for (idx = 0; idx < cfl_array_size(array); idx++) { + struct cfl_variant *entry; + struct cfl_kvlist *copy; + struct flb_cf_section *section; + + entry = cfl_array_fetch_by_index(array, idx); + if (!entry || entry->type != CFL_VARIANT_KVLIST) { + flb_cf_destroy(cf); + return NULL; + } + + copy = clone_kvlist(entry->data.as_kvlist); + if (!copy) { + flb_cf_destroy(cf); + return NULL; + } + + section = flb_cf_section_create(cf, "input", 5); + if (!section) { + cfl_kvlist_destroy(copy); + flb_cf_destroy(cf); + return NULL; + } + + cfl_kvlist_destroy(section->properties); + section->properties = copy; + } + + return cf; +} + +static struct flb_cf *load_cf_from_yaml(const char *path) +{ + return flb_cf_yaml_create(NULL, (char *) path, NULL, 0); +} + +static struct cfl_variant *create_inputs_variant() +{ + struct cfl_array *inputs; + struct cfl_kvlist *input; + struct cfl_array *processors; + struct cfl_kvlist *proc; + struct cfl_kvlist *routes; + struct cfl_array *log_routes; + struct cfl_kvlist *route; + struct cfl_kvlist *condition; + struct cfl_array *rules; + struct cfl_kvlist *rule_kv; + struct cfl_variant *rule_variant; + struct cfl_array *outputs; + struct cfl_kvlist *to; + struct cfl_kvlist *output_obj; + struct cfl_kvlist *default_condition; + struct cfl_variant *inputs_variant; + + inputs = cfl_array_create(1); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return NULL; + } + + input = cfl_kvlist_create(); + TEST_CHECK(input != NULL); + if (!input) { + cfl_array_destroy(inputs); + return NULL; + } + + TEST_CHECK(cfl_kvlist_insert_string(input, "name", "opentelemetry") == 0); + + processors = cfl_array_create(1); + TEST_CHECK(processors != NULL); + proc = cfl_kvlist_create(); + TEST_CHECK(proc != NULL); + TEST_CHECK(cfl_kvlist_insert_string(proc, "name", "parser") == 0); + TEST_CHECK(cfl_kvlist_insert_string(proc, "parser", "json") == 0); + TEST_CHECK(cfl_array_append(processors, cfl_variant_create_from_kvlist(proc)) == 0); + TEST_CHECK(cfl_kvlist_insert_array(input, "processors", processors) == 0); + + routes = cfl_kvlist_create(); + TEST_CHECK(routes != NULL); + log_routes = cfl_array_create(2); + TEST_CHECK(log_routes != NULL); + + /* error_logs route */ + route = cfl_kvlist_create(); + TEST_CHECK(route != NULL); + TEST_CHECK(cfl_kvlist_insert_string(route, "name", "error_logs") == 0); + + condition = cfl_kvlist_create(); + TEST_CHECK(condition != NULL); + rules = cfl_array_create(1); + TEST_CHECK(rules != NULL); + + rule_kv = cfl_kvlist_create(); + TEST_CHECK(rule_kv != NULL); + TEST_CHECK(cfl_kvlist_insert_string(rule_kv, "field", "$level") == 0); + TEST_CHECK(cfl_kvlist_insert_string(rule_kv, "op", "eq") == 0); + TEST_CHECK(cfl_kvlist_insert_string(rule_kv, "value", "error") == 0); + rule_variant = cfl_variant_create_from_kvlist(rule_kv); + TEST_CHECK(rule_variant != NULL); + TEST_CHECK(cfl_array_append(rules, rule_variant) == 0); + + TEST_CHECK(cfl_kvlist_insert_array(condition, "rules", rules) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "condition", condition) == 0); + + outputs = cfl_array_create(1); + TEST_CHECK(outputs != NULL); + TEST_CHECK(cfl_array_append_string(outputs, "loki") == 0); + to = cfl_kvlist_create(); + TEST_CHECK(to != NULL); + TEST_CHECK(cfl_kvlist_insert_array(to, "outputs", outputs) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "to", to) == 0); + + TEST_CHECK(cfl_array_append(log_routes, cfl_variant_create_from_kvlist(route)) == 0); + + /* default route */ + route = cfl_kvlist_create(); + TEST_CHECK(route != NULL); + TEST_CHECK(cfl_kvlist_insert_string(route, "name", "default") == 0); + + default_condition = cfl_kvlist_create(); + TEST_CHECK(default_condition != NULL); + TEST_CHECK(cfl_kvlist_insert_bool(default_condition, "default", 1) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "condition", default_condition) == 0); + + outputs = cfl_array_create(1); + TEST_CHECK(outputs != NULL); + output_obj = cfl_kvlist_create(); + TEST_CHECK(output_obj != NULL); + TEST_CHECK(cfl_kvlist_insert_string(output_obj, "name", "elasticsearch") == 0); + TEST_CHECK(cfl_kvlist_insert_string(output_obj, "fallback", "s3_backup") == 0); + TEST_CHECK(cfl_array_append(outputs, cfl_variant_create_from_kvlist(output_obj)) == 0); + + to = cfl_kvlist_create(); + TEST_CHECK(to != NULL); + TEST_CHECK(cfl_kvlist_insert_array(to, "outputs", outputs) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "to", to) == 0); + + TEST_CHECK(cfl_array_append(log_routes, cfl_variant_create_from_kvlist(route)) == 0); + + TEST_CHECK(cfl_kvlist_insert_array(routes, "logs", log_routes) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(input, "routes", routes) == 0); + TEST_CHECK(cfl_array_append(inputs, cfl_variant_create_from_kvlist(input)) == 0); + + inputs_variant = cfl_variant_create_from_array(inputs); + TEST_CHECK(inputs_variant != NULL); + + return inputs_variant; +} +static struct cfl_variant *create_duplicate_route_inputs() +{ + struct cfl_array *inputs; + struct cfl_kvlist *input; + struct cfl_kvlist *routes; + struct cfl_array *log_routes; + struct cfl_kvlist *route; + struct cfl_kvlist *condition; + struct cfl_kvlist *to; + struct cfl_array *outputs; + int idx; + + inputs = cfl_array_create(1); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return NULL; + } + + input = cfl_kvlist_create(); + TEST_CHECK(input != NULL); + if (!input) { + cfl_array_destroy(inputs); + return NULL; + } + + TEST_CHECK(cfl_kvlist_insert_string(input, "name", "duplicate") == 0); + + routes = cfl_kvlist_create(); + TEST_CHECK(routes != NULL); + log_routes = cfl_array_create(2); + TEST_CHECK(log_routes != NULL); + + for (idx = 0; idx < 2; idx++) { + route = cfl_kvlist_create(); + TEST_CHECK(route != NULL); + TEST_CHECK(cfl_kvlist_insert_string(route, "name", "dup") == 0); + + condition = cfl_kvlist_create(); + TEST_CHECK(condition != NULL); + TEST_CHECK(cfl_kvlist_insert_bool(condition, "default", 1) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "condition", condition) == 0); + + outputs = cfl_array_create(1); + TEST_CHECK(outputs != NULL); + if (idx == 0) { + TEST_CHECK(cfl_array_append_string(outputs, "primary") == 0); + } + else { + TEST_CHECK(cfl_array_append_string(outputs, "secondary") == 0); + } + + to = cfl_kvlist_create(); + TEST_CHECK(to != NULL); + TEST_CHECK(cfl_kvlist_insert_array(to, "outputs", outputs) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "to", to) == 0); + + TEST_CHECK(cfl_array_append(log_routes, cfl_variant_create_from_kvlist(route)) == 0); + } + + TEST_CHECK(cfl_kvlist_insert_array(routes, "logs", log_routes) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(input, "routes", routes) == 0); + TEST_CHECK(cfl_array_append(inputs, cfl_variant_create_from_kvlist(input)) == 0); + + return cfl_variant_create_from_array(inputs); +} +void test_router_config_parse_basic() +{ + struct cfl_list routes; + struct cfl_variant *inputs; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *first_route; + struct flb_route *second_route; + struct flb_route_output *output; + struct cfl_list *head; + struct cfl_list *route_head; + int ret; + + cfl_list_init(&routes); + + inputs = create_inputs_variant(); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return; + } + + cf = cf_from_inputs_variant(inputs); + TEST_CHECK(cf != NULL); + if (!cf) { + cfl_variant_destroy(inputs); + return; + } + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(cfl_list_size(&routes) == 1); + head = routes.next; + input_routes = cfl_list_entry(head, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "opentelemetry") == 0); + TEST_CHECK(cfl_list_size(&input_routes->processors) == 1); + TEST_CHECK(cfl_list_size(&input_routes->routes) == 2); + + route_head = input_routes->routes.next; + first_route = cfl_list_entry(route_head, struct flb_route, _head); + TEST_CHECK(strcmp(first_route->name, "error_logs") == 0); + TEST_CHECK(first_route->signals == FLB_ROUTER_SIGNAL_LOGS); + TEST_CHECK(first_route->condition != NULL); + TEST_CHECK(cfl_list_size(&first_route->outputs) == 1); + output = cfl_list_entry(first_route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "loki") == 0); + + route_head = route_head->next; + second_route = cfl_list_entry(route_head, struct flb_route, _head); + TEST_CHECK(second_route->condition != NULL); + TEST_CHECK(second_route->condition->is_default == FLB_TRUE); + TEST_CHECK(cfl_list_size(&second_route->outputs) == 1); + TEST_CHECK(second_route->signals == FLB_ROUTER_SIGNAL_LOGS); + output = cfl_list_entry(second_route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->fallback, "s3_backup") == 0); + + flb_router_routes_destroy(&routes); + } + + if (ret != 0) { + cfl_list_init(&routes); + } + + flb_cf_destroy(cf); + cfl_variant_destroy(inputs); +} + +void test_router_config_duplicate_route() +{ + struct cfl_list routes; + struct cfl_variant *inputs; + struct flb_cf *cf; + int ret; + + cfl_list_init(&routes); + + inputs = create_duplicate_route_inputs(); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return; + } + + cf = cf_from_inputs_variant(inputs); + TEST_CHECK(cf != NULL); + if (!cf) { + cfl_variant_destroy(inputs); + return; + } + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret != 0); + TEST_CHECK(cfl_list_is_empty(&routes) == 1); + + flb_cf_destroy(cf); + cfl_variant_destroy(inputs); +} + +void test_router_config_parse_file_basic() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct cfl_list *routes_head; + struct cfl_list *route_head; + struct flb_route *route; + struct flb_route_output *output; + int seen_error; + int seen_metrics; + int seen_default; + int ret; + + cf = load_cf_from_yaml(FLB_ROUTER_TEST_FILE("basic.yaml")); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + seen_error = FLB_FALSE; + seen_metrics = FLB_FALSE; + seen_default = FLB_FALSE; + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + routes_head = routes.next; + input_routes = cfl_list_entry(routes_head, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "opentelemetry") == 0); + TEST_CHECK(cfl_list_size(&input_routes->routes) == 3); + + cfl_list_foreach(route_head, &input_routes->routes) { + route = cfl_list_entry(route_head, struct flb_route, _head); + if (strcmp(route->name, "error_logs") == 0) { + seen_error = FLB_TRUE; + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_LOGS); + output = cfl_list_entry(route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "loki") == 0); + TEST_CHECK(strcmp(output->fallback, "s3_backup") == 0); + } + else if (strcmp(route->name, "metrics_above_threshold") == 0) { + seen_metrics = FLB_TRUE; + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_METRICS); + } + else if (strcmp(route->name, "default") == 0) { + seen_default = FLB_TRUE; + TEST_CHECK(route->condition != NULL); + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_LOGS); + TEST_CHECK(route->condition->is_default == FLB_TRUE); + output = cfl_list_entry(route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "elasticsearch") == 0); + } + } + + TEST_CHECK(seen_error == FLB_TRUE); + TEST_CHECK(seen_metrics == FLB_TRUE); + TEST_CHECK(seen_default == FLB_TRUE); + + flb_router_routes_destroy(&routes); + } + + flb_cf_destroy(cf); +} + +void test_router_config_parse_file_multi_signal() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *output; + struct cfl_list *route_head; + struct cfl_list *output_head; + int seen_multi; + int seen_default; + int outputs; + int ret; + + cf = load_cf_from_yaml(FLB_ROUTER_TEST_FILE("multi_signal.yaml")); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + seen_multi = FLB_FALSE; + seen_default = FLB_FALSE; + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + input_routes = cfl_list_entry(routes.next, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "telemetry") == 0); + + cfl_list_foreach(route_head, &input_routes->routes) { + route = cfl_list_entry(route_head, struct flb_route, _head); + if (strcmp(route->name, "service_checkout") == 0) { + seen_multi = FLB_TRUE; + TEST_CHECK(route->signals == + (FLB_ROUTER_SIGNAL_LOGS | FLB_ROUTER_SIGNAL_TRACES)); + } + else if (strcmp(route->name, "catch_all") == 0) { + seen_default = FLB_TRUE; + TEST_CHECK(route->condition != NULL && + route->condition->is_default == FLB_TRUE); + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_LOGS); + + outputs = 0; + cfl_list_foreach(output_head, &route->outputs) { + output = cfl_list_entry(output_head, struct flb_route_output, _head); + outputs++; + if (strcmp(output->name, "s3_archive") == 0) { + TEST_CHECK(strcmp(output->fallback, "glacier") == 0); + } + } + TEST_CHECK(outputs == 2); + } + } + + TEST_CHECK(seen_multi == FLB_TRUE); + TEST_CHECK(seen_default == FLB_TRUE); + + flb_router_routes_destroy(&routes); + } + + flb_cf_destroy(cf); +} + +void test_router_config_parse_file_metrics() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *output; + int ret; + + cf = load_cf_from_yaml(FLB_ROUTER_TEST_FILE("metrics.yaml")); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + input_routes = cfl_list_entry(routes.next, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "metrics") == 0); + TEST_CHECK(cfl_list_size(&input_routes->routes) == 1); + + route = cfl_list_entry(input_routes->routes.next, + struct flb_route, _head); + TEST_CHECK(strcmp(route->name, "cpu_hot") == 0); + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_METRICS); + + output = cfl_list_entry(route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "prometheus_remote") == 0); + TEST_CHECK(strcmp(output->fallback, "s3_backup") == 0); + + flb_router_routes_destroy(&routes); + } + + flb_cf_destroy(cf); +} + +static void setup_test_instances(struct flb_config *config, + struct flb_input_instance *input, + struct flb_input_plugin *input_plugin, + const char *input_alias, + const char *input_type, + struct flb_output_instance *output, + struct flb_output_plugin *output_plugin, + const char *output_alias, + const char *output_type) +{ + memset(config, 0, sizeof(struct flb_config)); + mk_list_init(&config->inputs); + mk_list_init(&config->outputs); + cfl_list_init(&config->input_routes); + + memset(input, 0, sizeof(struct flb_input_instance)); + mk_list_init(&input->_head); + mk_list_init(&input->routes_direct); + mk_list_init(&input->routes); + mk_list_init(&input->tasks); + mk_list_init(&input->chunks); + mk_list_init(&input->collectors); + snprintf(input->name, sizeof(input->name), "%s.0", input_type); + if (input_alias) { + input->alias = flb_sds_create(input_alias); + TEST_CHECK(input->alias != NULL); + } + else { + input->alias = NULL; + } + input_plugin->name = (char *) input_type; + input->p = input_plugin; + mk_list_add(&input->_head, &config->inputs); + + memset(output, 0, sizeof(struct flb_output_instance)); + mk_list_init(&output->_head); + mk_list_init(&output->properties); + mk_list_init(&output->net_properties); + snprintf(output->name, sizeof(output->name), "%s.0", output_type); + if (output_alias) { + output->alias = flb_sds_create(output_alias); + TEST_CHECK(output->alias != NULL); + } + else { + output->alias = NULL; + } + output->event_type = FLB_OUTPUT_LOGS; + output_plugin->name = (char *) output_type; + output->p = output_plugin; + mk_list_add(&output->_head, &config->outputs); +} + +void test_router_apply_config_success() +{ + struct flb_config config; + struct flb_input_instance input; + struct flb_output_instance output; + struct flb_input_routes input_routes; + struct flb_route route; + struct flb_route_output route_output; + struct flb_input_plugin input_plugin; + struct flb_output_plugin output_plugin; + struct flb_router_path *path; + + setup_test_instances(&config, &input, &input_plugin, "dummy", "dummy", + &output, &output_plugin, "printme", "stdout"); + + memset(&input_routes, 0, sizeof(input_routes)); + cfl_list_init(&input_routes._head); + cfl_list_init(&input_routes.routes); + input_routes.input_name = flb_sds_create("dummy"); + cfl_list_add(&input_routes._head, &config.input_routes); + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route._head); + cfl_list_init(&route.outputs); + route.name = flb_sds_create("error_logs"); + route.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route._head, &input_routes.routes); + + memset(&route_output, 0, sizeof(route_output)); + cfl_list_init(&route_output._head); + route_output.name = flb_sds_create("printme"); + cfl_list_add(&route_output._head, &route.outputs); + + TEST_CHECK(flb_router_apply_config(&config) == 0); + TEST_CHECK(mk_list_size(&input.routes_direct) == 1); + + path = mk_list_entry(input.routes_direct.next, struct flb_router_path, _head); + TEST_CHECK(path->ins == &output); + + flb_router_exit(&config); + + flb_sds_destroy(input.alias); + flb_sds_destroy(output.alias); + flb_sds_destroy(input_routes.input_name); + flb_sds_destroy(route.name); + flb_sds_destroy(route_output.name); +} + +void test_router_apply_config_missing_output() +{ + struct flb_config config; + struct flb_input_instance input; + struct flb_output_instance output; + struct flb_input_routes input_routes; + struct flb_route route; + struct flb_route_output route_output; + struct flb_input_plugin input_plugin; + struct flb_output_plugin output_plugin; + + setup_test_instances(&config, &input, &input_plugin, "dummy", "dummy", + &output, &output_plugin, "printme", "stdout"); + + memset(&input_routes, 0, sizeof(input_routes)); + cfl_list_init(&input_routes._head); + cfl_list_init(&input_routes.routes); + input_routes.input_name = flb_sds_create("dummy"); + cfl_list_add(&input_routes._head, &config.input_routes); + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route._head); + cfl_list_init(&route.outputs); + route.name = flb_sds_create("error_logs"); + route.signals = FLB_ROUTER_SIGNAL_LOGS; + cfl_list_add(&route._head, &input_routes.routes); + + memset(&route_output, 0, sizeof(route_output)); + cfl_list_init(&route_output._head); + route_output.name = flb_sds_create("unknown"); + cfl_list_add(&route_output._head, &route.outputs); + + TEST_CHECK(flb_router_apply_config(&config) == 0); + + /* note mk_list_is_empty return 0 if is empty */ + TEST_CHECK(!mk_list_is_empty(&input.routes_direct)); + + flb_router_exit(&config); + + flb_sds_destroy(input.alias); + flb_sds_destroy(output.alias); + flb_sds_destroy(input_routes.input_name); + flb_sds_destroy(route.name); + flb_sds_destroy(route_output.name); +} + +void test_router_route_default_precedence() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *output; + struct flb_event_chunk chunk; + int ret; + int match; + + cf = flb_cf_yaml_create(NULL, (char *) FLB_ROUTER_TEST_FILE("precedence.yaml"), NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_cf_destroy(cf); + return; + } + + input_routes = cfl_list_entry(routes.next, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "lib") == 0); + + route = cfl_list_entry(input_routes->routes.next, struct flb_route, _head); + TEST_CHECK(route->condition != NULL); + TEST_CHECK(route->condition->is_default == FLB_TRUE); + + memset(&chunk, 0, sizeof(chunk)); + chunk.type = FLB_EVENT_TYPE_LOGS; + + TEST_CHECK(flb_route_condition_eval(&chunk, route) == FLB_TRUE); + + output = cfl_list_entry(route->outputs.next, struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "lib_route") == 0); + + match = flb_router_match("lib.input", strlen("lib.input"), "does-not-match", NULL); + TEST_CHECK(match == FLB_FALSE); + + flb_router_routes_destroy(&routes); + flb_cf_destroy(cf); +} + +TEST_LIST = { + { "parse_basic", test_router_config_parse_basic }, + { "duplicate_route", test_router_config_duplicate_route }, + { "parse_basic_file", test_router_config_parse_file_basic }, + { "parse_multi_signal_file", test_router_config_parse_file_multi_signal }, + { "parse_metrics_file", test_router_config_parse_file_metrics }, + { "apply_config_success", test_router_apply_config_success }, + { "apply_config_missing_output", test_router_apply_config_missing_output }, + { "route_default_precedence", test_router_route_default_precedence }, + { 0 } +};