diff --git a/ROUTER.md b/ROUTER.md new file mode 100644 index 00000000000..fd73dc63e1c --- /dev/null +++ b/ROUTER.md @@ -0,0 +1,108 @@ +# Telemetry Routing Overview + +Fluent Bit's telemetry router provides a unified configuration surface for +logs, metrics, and traces. Routes are defined on a per-input basis and apply to +all signals produced by that input. Each route declares an optional signal +filter, a condition, per-route processors, and the list of output targets that +should receive matching telemetry. + +## Configuration Structure + +Routes are declared inside `pipeline.inputs[]` entries in the Fluent Bit YAML +configuration: + +```yaml +pipeline: + inputs: + - name: opentelemetry + processors: + - name: parser + parser: json + routes: + logs: + - name: errors + condition: + rules: + - field: "$level" + op: eq + value: "error" + to: + outputs: + - name: loki + fallback: s3_archive + - name: default + condition: + default: true + to: + outputs: + - name: elasticsearch + metrics: + - name: cpu_hot + condition: + rules: + - field: "$metric.name" + op: regex + value: "^cpu_" + to: + outputs: + - name: prometheus_remote +``` + +### Key elements + +* **Input processors** – shared processors executed before any routing logic. +* **Routes** – grouped by telemetry signal. Each key under `routes` must be a + signal label (`logs`, `metrics`, `traces`, or a comma-separated combination) + whose array value contains the ordered route definitions for that signal. A + key of `any` targets all telemetry types. + * `condition.rules` contains record accessor comparisons evaluated against the + chunk. Routes can also mark themselves as the default handler with + `condition.default: true`. + * `processors` (optional) define per-route processor chains executed after the + condition succeeds and before dispatching to outputs. + * `to.outputs` lists the primary output targets. Entries may be simple names + or objects with an optional `fallback` output used when the primary target + fails. + +## Evaluation Order + +1. Execute input-level processors. +2. For each route whose signal mask matches the chunk type: + 1. Evaluate the route condition. Routes flagged as `default` bypass rule + evaluation. + 2. If the condition succeeds, run the route processors and then attempt to + send the chunk to each configured output. +3. When an output write fails permanently, the router retries once using the + configured fallback output (if any). + +## Conditions and Field Resolution + +Route conditions rely on record accessors that are aware of the telemetry type: + +* **Logs** – record keys, metadata, `exists`, `eq`, `regex`, `contains`, `in`. +* **Metrics** – metric name/value, resource and attribute keys, numeric + comparison operators (`gt`, `lt`, `gte`, `lte`). +* **Traces** – span fields and resource/scope attributes with equality, regex, + and duration comparisons. + +The loader validates that each referenced field is supported by the selected +signals, providing early feedback during configuration parsing. + +## Output Fallbacks + +Outputs may declare a secondary target that receives the chunk when the primary +write fails with a permanent error. Fallback dispatch happens once per output +attempt and produces debug logs as well as `fluentbit_routing_fallback_total` +metrics to aid troubleshooting. + +## Metrics and Observability + +The router exposes Prometheus counters using the `fluentbit_routing_*` prefix +covering routed records, bytes, condition failures, and fallback events. The +labels capture the input name, route name, target output, and signal type. + +## Cleanup Helpers + +The router exposes `flb_router_routes_destroy()` to release all resources +allocated during YAML parsing. Callers should destroy the list after invoking +`flb_router_config_parse()` once the configuration has been consumed. diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 81528248eb6..744000c3f72 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -23,6 +23,11 @@ #include #include #include +#include +#include +#include +#include +#include struct flb_router_path { struct flb_output_instance *ins; @@ -56,6 +61,61 @@ static inline int flb_router_match_type(int in_event_type, return FLB_TRUE; } +enum flb_router_signal { + FLB_ROUTER_SIGNAL_LOGS = (1U << 0), + FLB_ROUTER_SIGNAL_METRICS = (1U << 1), + FLB_ROUTER_SIGNAL_TRACES = (1U << 2), + FLB_ROUTER_SIGNAL_ANY = (FLB_ROUTER_SIGNAL_LOGS | + FLB_ROUTER_SIGNAL_METRICS | + FLB_ROUTER_SIGNAL_TRACES) +}; + +struct flb_route_condition_rule { + flb_sds_t field; + flb_sds_t op; + flb_sds_t value; + struct cfl_list _head; +}; + +struct flb_route_condition { + struct cfl_list rules; + int is_default; +}; + +struct flb_route_output { + flb_sds_t name; + flb_sds_t fallback; + struct cfl_list _head; +}; + +struct flb_route_processor_property { + flb_sds_t key; + flb_sds_t value; + struct cfl_list _head; +}; + +struct flb_route_processor { + flb_sds_t name; + struct cfl_list properties; + struct cfl_list _head; +}; + +struct flb_route { + flb_sds_t name; + uint32_t signals; + struct flb_route_condition *condition; + struct cfl_list outputs; + struct cfl_list processors; + struct cfl_list _head; +}; + +struct flb_input_routes { + flb_sds_t input_name; + struct cfl_list processors; + struct cfl_list routes; + struct cfl_list _head; +}; + int flb_router_connect(struct flb_input_instance *in, struct flb_output_instance *out); int flb_router_connect_direct(struct flb_input_instance *in, @@ -65,4 +125,24 @@ int flb_router_match(const char *tag, int tag_len, const char *match, void *match_regex); int flb_router_io_set(struct flb_config *config); void flb_router_exit(struct flb_config *config); + +uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk); + +int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_route *route); +int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_route *route); +int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_route *route); +int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_route *route); + +struct flb_cf; + +int flb_router_config_parse(struct flb_cf *cf, + struct cfl_list *input_routes, + struct flb_config *config); +void flb_router_routes_destroy(struct cfl_list *input_routes); + #endif + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 13bc3e4418f..c00a1bb13c5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -54,6 +54,8 @@ set(src flb_upstream_ha.c flb_upstream_node.c flb_router.c + flb_router_condition.c + flb_router_config.c flb_worker.c flb_coro.c flb_time.c diff --git a/src/flb_router_condition.c b/src/flb_router_condition.c new file mode 100644 index 00000000000..318227a094a --- /dev/null +++ b/src/flb_router_condition.c @@ -0,0 +1,123 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) +{ + if (!chunk) { + return 0; + } + + switch (chunk->type) { + case FLB_EVENT_TYPE_LOGS: + return FLB_ROUTER_SIGNAL_LOGS; + case FLB_EVENT_TYPE_METRICS: + return FLB_ROUTER_SIGNAL_METRICS; + case FLB_EVENT_TYPE_TRACES: + return FLB_ROUTER_SIGNAL_TRACES; + default: + break; + } + + return 0; +} + +int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + (void) chunk; + (void) route; + + /* + * The full condition evaluation engine requires field resolvers that map + * record accessors to the different telemetry payload shapes. The wiring + * of those resolvers is part of a bigger effort and will be implemented in + * follow-up changes. For the time being we simply report that the + * condition failed so that the runtime can rely on explicit default + * routes. + */ + return FLB_FALSE; +} + +int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + (void) chunk; + (void) route; + + return FLB_FALSE; +} + +int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + (void) chunk; + (void) route; + + return FLB_FALSE; +} + +int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_route *route) +{ + uint32_t signal; + + if (!route) { + return FLB_FALSE; + } + + if (!route->condition) { + return FLB_TRUE; + } + + signal = flb_router_signal_from_chunk(chunk); + if (signal == 0) { + return FLB_FALSE; + } + + if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && + ((route->signals & signal) == 0)) { + return FLB_FALSE; + } + + if (route->condition->is_default) { + return FLB_TRUE; + } + + if (cfl_list_is_empty(&route->condition->rules) == 0) { + return FLB_TRUE; + } + + switch (signal) { + case FLB_ROUTER_SIGNAL_LOGS: + return flb_condition_eval_logs(chunk, route); + case FLB_ROUTER_SIGNAL_METRICS: + return flb_condition_eval_metrics(chunk, route); + case FLB_ROUTER_SIGNAL_TRACES: + return flb_condition_eval_traces(chunk, route); + default: + break; + } + + return FLB_FALSE; +} + diff --git a/src/flb_router_config.c b/src/flb_router_config.c new file mode 100644 index 00000000000..d5567fbaaf9 --- /dev/null +++ b/src/flb_router_config.c @@ -0,0 +1,1043 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the \"License\"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an \"AS IS\" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#ifndef _WIN32 +#include +#endif + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +static flb_sds_t copy_from_cfl_sds(cfl_sds_t value) +{ + if (!value) { + return NULL; + } + + return flb_sds_create_len(value, cfl_sds_len(value)); +} + +static flb_sds_t variant_to_sds(struct cfl_variant *var) +{ + char tmp[64]; + int len; + + if (!var) { + return NULL; + } + + switch (var->type) { + case CFL_VARIANT_STRING: + return copy_from_cfl_sds(var->data.as_string); + case CFL_VARIANT_INT: + len = snprintf(tmp, sizeof(tmp), \"%\" PRId64, var->data.as_int64); + if (len < 0) { + return NULL; + } + return flb_sds_create_len(tmp, len); + case CFL_VARIANT_UINT: + len = snprintf(tmp, sizeof(tmp), \"%\" PRIu64, var->data.as_uint64); + if (len < 0) { + return NULL; + } + return flb_sds_create_len(tmp, len); + case CFL_VARIANT_DOUBLE: + len = snprintf(tmp, sizeof(tmp), \"%.*g\", 17, var->data.as_double); + if (len < 0) { + return NULL; + } + return flb_sds_create_len(tmp, len); + case CFL_VARIANT_BOOL: + return flb_sds_create(var->data.as_bool ? \"true\" : \"false\"); + default: + break; + } + + return NULL; +} + +static int variant_to_bool(struct cfl_variant *var, int *out) +{ + if (!var || !out) { + return -1; + } + + if (var->type == CFL_VARIANT_BOOL) { + *out = var->data.as_bool != 0; + return 0; + } + else if (var->type == CFL_VARIANT_STRING && var->data.as_string) { + const char *val = var->data.as_string; + + if (strcasecmp(val, \"true\") == 0) { + *out = FLB_TRUE; + return 0; + } + else if (strcasecmp(val, \"false\") == 0) { + *out = FLB_FALSE; + return 0; + } + } + + return -1; +} + +static int field_allowed_for_logs(const char *field) +{ + if (!field) { + return FLB_FALSE; + } + + if (strncmp(field, \"$metric.\", 8) == 0) { + return FLB_FALSE; + } + if (strncmp(field, \"$span.\", 6) == 0) { + return FLB_FALSE; + } + if (strncmp(field, \"$scope.\", 7) == 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int field_allowed_for_metrics(const char *field) +{ + if (!field) { + return FLB_FALSE; + } + + if (strncmp(field, \"$metric.\", 8) == 0) { + return FLB_TRUE; + } + if (strncmp(field, \"$resource[\", 10) == 0) { + return FLB_TRUE; + } + if (strncmp(field, \"$attributes[\", 12) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int field_allowed_for_traces(const char *field) +{ + if (!field) { + return FLB_FALSE; + } + + if (strncmp(field, \"$span.\", 6) == 0) { + return FLB_TRUE; + } + if (strncmp(field, \"$resource[\", 10) == 0) { + return FLB_TRUE; + } + if (strncmp(field, \"$scope[\", 7) == 0) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int validate_rule_field(const char *field, uint32_t signals) +{ + int ok = FLB_FALSE; + + if (!field) { + return FLB_FALSE; + } + + if (signals == FLB_ROUTER_SIGNAL_ANY) { + signals = FLB_ROUTER_SIGNAL_LOGS | + FLB_ROUTER_SIGNAL_METRICS | + FLB_ROUTER_SIGNAL_TRACES; + } + + if (signals & FLB_ROUTER_SIGNAL_LOGS) { + if (field_allowed_for_logs(field)) { + ok = FLB_TRUE; + } + else { + return FLB_FALSE; + } + } + + if (signals & FLB_ROUTER_SIGNAL_METRICS) { + if (field_allowed_for_metrics(field)) { + ok = FLB_TRUE; + } + else { + return FLB_FALSE; + } + } + + if (signals & FLB_ROUTER_SIGNAL_TRACES) { + if (field_allowed_for_traces(field)) { + ok = FLB_TRUE; + } + else { + return FLB_FALSE; + } + } + + return ok; +} + +static uint32_t parse_signal_key(const char *key) +{ + const char *cursor; + uint32_t mask = 0; + + if (!key) { + return 0; + } + + cursor = key; + while (*cursor) { + const char *start; + size_t len; + + while (*cursor && (isspace((unsigned char) *cursor) || + *cursor == ',' || *cursor == '|' || *cursor == '+')) { + cursor++; + } + + if (*cursor == '\0') { + break; + } + + start = cursor; + while (*cursor && !isspace((unsigned char) *cursor) && + *cursor != ',' && *cursor != '|' && *cursor != '+') { + cursor++; + } + + len = cursor - start; + if (len == 0) { + continue; + } + + if (len == 4 && strncasecmp(start, "logs", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_LOGS; + } + else if (len == 7 && strncasecmp(start, "metrics", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_METRICS; + } + else if (len == 6 && strncasecmp(start, "traces", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_TRACES; + } + else if (len == 3 && strncasecmp(start, "any", len) == 0) { + mask |= FLB_ROUTER_SIGNAL_ANY; + } + else { + return 0; + } + } + + return mask; +} + +static void route_condition_destroy(struct flb_route_condition *condition) +{ + struct cfl_list *tmp; + struct cfl_list *head; + + if (!condition) { + return; + } + + cfl_list_foreach_safe(head, tmp, &condition->rules) { + struct flb_route_condition_rule *rule; + + rule = cfl_list_entry(head, struct flb_route_condition_rule, _head); + cfl_list_del(&rule->_head); + + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + + flb_free(rule); + } + + flb_free(condition); +} + +static void route_outputs_destroy(struct flb_route *route) +{ + struct cfl_list *tmp; + struct cfl_list *head; + + cfl_list_foreach_safe(head, tmp, &route->outputs) { + struct flb_route_output *output; + + output = cfl_list_entry(head, struct flb_route_output, _head); + cfl_list_del(&output->_head); + + if (output->name) { + flb_sds_destroy(output->name); + } + if (output->fallback) { + flb_sds_destroy(output->fallback); + } + flb_free(output); + } +} + +static void route_processors_destroy(struct cfl_list *processors) +{ + struct cfl_list *tmp; + struct cfl_list *head; + + if (!processors) { + return; + } + + cfl_list_foreach_safe(head, tmp, processors) { + struct flb_route_processor *processor; + struct cfl_list *p_head; + struct cfl_list *p_tmp; + + processor = cfl_list_entry(head, struct flb_route_processor, _head); + cfl_list_del(&processor->_head); + + cfl_list_foreach_safe(p_head, p_tmp, &processor->properties) { + struct flb_route_processor_property *prop; + + prop = cfl_list_entry(p_head, struct flb_route_processor_property, _head); + cfl_list_del(&prop->_head); + + if (prop->key) { + flb_sds_destroy(prop->key); + } + if (prop->value) { + flb_sds_destroy(prop->value); + } + flb_free(prop); + } + + if (processor->name) { + flb_sds_destroy(processor->name); + } + flb_free(processor); + } +} + +void flb_router_routes_destroy(struct cfl_list *input_routes) +{ + struct cfl_list *head; + struct cfl_list *tmp; + + if (!input_routes) { + return; + } + + cfl_list_foreach_safe(head, tmp, input_routes) { + struct flb_input_routes *routes; + struct cfl_list *r_head; + struct cfl_list *r_tmp; + + routes = cfl_list_entry(head, struct flb_input_routes, _head); + cfl_list_del(&routes->_head); + + route_processors_destroy(&routes->processors); + + cfl_list_foreach_safe(r_head, r_tmp, &routes->routes) { + struct flb_route *route; + + route = cfl_list_entry(r_head, struct flb_route, _head); + cfl_list_del(&route->_head); + + if (route->condition) { + route_condition_destroy(route->condition); + } + + route_outputs_destroy(route); + route_processors_destroy(&route->processors); + + if (route->name) { + flb_sds_destroy(route->name); + } + flb_free(route); + } + + if (routes->input_name) { + flb_sds_destroy(routes->input_name); + } + + flb_free(routes); + } +} + +static int add_processor_properties(struct flb_route_processor *processor, + struct cfl_kvlist *kvlist) +{ + struct cfl_list *head; + + if (!processor || !kvlist) { + return -1; + } + + cfl_list_foreach(head, &kvlist->list) { + struct cfl_kvpair *pair; + struct flb_route_processor_property *prop; + + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (strcmp(pair->key, \"name\") == 0) { + continue; + } + + prop = flb_calloc(1, sizeof(struct flb_route_processor_property)); + if (!prop) { + flb_errno(); + return -1; + } + + cfl_list_init(&prop->_head); + + prop->key = flb_sds_create_len(pair->key, cfl_sds_len(pair->key)); + if (!prop->key) { + flb_free(prop); + return -1; + } + + prop->value = variant_to_sds(pair->val); + if (!prop->value) { + flb_sds_destroy(prop->key); + flb_free(prop); + return -1; + } + + cfl_list_add(&prop->_head, &processor->properties); + } + + return 0; +} + +static int parse_processors(struct cfl_variant *variant, + struct cfl_list *out_list, + struct flb_config *config) +{ + size_t idx; + struct cfl_array *array; + + (void) config; + + if (!variant || !out_list) { + return -1; + } + + if (variant->type != CFL_VARIANT_ARRAY) { + return -1; + } + + array = variant->data.as_array; + for (idx = 0; idx < cfl_array_size(array); idx++) { + struct cfl_variant *entry; + struct cfl_kvlist *kvlist; + struct cfl_variant *name_var; + struct flb_route_processor *processor; + + entry = cfl_array_fetch_by_index(array, idx); + if (!entry || entry->type != CFL_VARIANT_KVLIST) { + return -1; + } + + kvlist = entry->data.as_kvlist; + name_var = cfl_kvlist_fetch(kvlist, \"name\"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + return -1; + } + + processor = flb_calloc(1, sizeof(struct flb_route_processor)); + if (!processor) { + flb_errno(); + return -1; + } + cfl_list_init(&processor->_head); + cfl_list_init(&processor->properties); + + processor->name = copy_from_cfl_sds(name_var->data.as_string); + if (!processor->name) { + flb_free(processor); + return -1; + } + + if (add_processor_properties(processor, kvlist) != 0) { + route_processors_destroy(&processor->properties); + if (processor->name) { + flb_sds_destroy(processor->name); + } + flb_free(processor); + return -1; + } + + cfl_list_add(&processor->_head, out_list); + } + + return 0; +} + +static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant *variant) +{ + struct flb_route_condition_rule *rule; + struct cfl_kvlist *kvlist; + struct cfl_variant *field_var; + struct cfl_variant *op_var; + struct cfl_variant *value_var; + + if (!variant || variant->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + kvlist = variant->data.as_kvlist; + field_var = cfl_kvlist_fetch(kvlist, \"field\"); + op_var = cfl_kvlist_fetch(kvlist, \"op\"); + value_var = cfl_kvlist_fetch(kvlist, \"value\"); + + if (!field_var || field_var->type != CFL_VARIANT_STRING) { + return NULL; + } + if (!op_var || op_var->type != CFL_VARIANT_STRING) { + return NULL; + } + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + if (!rule) { + flb_errno(); + return NULL; + } + cfl_list_init(&rule->_head); + + rule->field = copy_from_cfl_sds(field_var->data.as_string); + if (!rule->field) { + flb_free(rule); + return NULL; + } + + rule->op = copy_from_cfl_sds(op_var->data.as_string); + if (!rule->op) { + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + if (value_var) { + rule->value = variant_to_sds(value_var); + if (!rule->value && strcmp(rule->op, \"exists\") != 0) { + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + } + + return rule; +} + +static struct flb_route_condition *parse_condition(struct cfl_variant *variant, + uint32_t signals) +{ + struct flb_route_condition *condition; + struct cfl_variant *rules_var; + struct cfl_variant *default_var; + struct cfl_array *rules_array; + size_t idx; + + if (!variant || variant->type != CFL_VARIANT_KVLIST) { + return NULL; + } + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + if (!condition) { + flb_errno(); + return NULL; + } + cfl_list_init(&condition->rules); + + rules_var = cfl_kvlist_fetch(variant->data.as_kvlist, \"rules\"); + default_var = cfl_kvlist_fetch(variant->data.as_kvlist, \"default\"); + + if (default_var) { + int val; + + if (variant_to_bool(default_var, &val) != 0) { + route_condition_destroy(condition); + return NULL; + } + condition->is_default = val; + } + + if (rules_var) { + if (rules_var->type != CFL_VARIANT_ARRAY) { + route_condition_destroy(condition); + return NULL; + } + + rules_array = rules_var->data.as_array; + for (idx = 0; idx < cfl_array_size(rules_array); idx++) { + struct cfl_variant *entry; + struct flb_route_condition_rule *rule; + + entry = cfl_array_fetch_by_index(rules_array, idx); + rule = parse_condition_rule(entry); + if (!rule) { + route_condition_destroy(condition); + return NULL; + } + + if (!validate_rule_field(rule->field, signals)) { + route_condition_destroy(condition); + return NULL; + } + + cfl_list_add(&rule->_head, &condition->rules); + } + } + + if (!condition->is_default && cfl_list_is_empty(&condition->rules) == 1) { + route_condition_destroy(condition); + return NULL; + } + + return condition; +} + +static int add_output_from_variant(struct flb_route *route, + struct cfl_variant *variant) +{ + struct flb_route_output *output; + + if (!route || !variant) { + return -1; + } + + output = flb_calloc(1, sizeof(struct flb_route_output)); + if (!output) { + flb_errno(); + return -1; + } + cfl_list_init(&output->_head); + + if (variant->type == CFL_VARIANT_STRING) { + output->name = copy_from_cfl_sds(variant->data.as_string); + if (!output->name) { + flb_free(output); + return -1; + } + } + else if (variant->type == CFL_VARIANT_KVLIST) { + struct cfl_variant *name_var; + struct cfl_variant *fallback_var; + + name_var = cfl_kvlist_fetch(variant->data.as_kvlist, \"name\"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + flb_free(output); + return -1; + } + output->name = copy_from_cfl_sds(name_var->data.as_string); + if (!output->name) { + flb_free(output); + return -1; + } + + fallback_var = cfl_kvlist_fetch(variant->data.as_kvlist, \"fallback\"); + if (fallback_var) { + if (fallback_var->type != CFL_VARIANT_STRING) { + flb_sds_destroy(output->name); + flb_free(output); + return -1; + } + output->fallback = copy_from_cfl_sds(fallback_var->data.as_string); + if (!output->fallback) { + flb_sds_destroy(output->name); + flb_free(output); + return -1; + } + } + } + else { + flb_free(output); + return -1; + } + + cfl_list_add(&output->_head, &route->outputs); + return 0; +} + +static int parse_outputs(struct cfl_variant *variant, struct flb_route *route) +{ + size_t idx; + + if (!variant || !route) { + return -1; + } + + if (variant->type == CFL_VARIANT_ARRAY) { + struct cfl_array *array = variant->data.as_array; + + if (cfl_array_size(array) == 0) { + return -1; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + struct cfl_variant *entry; + + entry = cfl_array_fetch_by_index(array, idx); + if (!entry) { + return -1; + } + + if (add_output_from_variant(route, entry) != 0) { + return -1; + } + } + + return 0; + } + + if (add_output_from_variant(route, variant) != 0) { + return -1; + } + + return 0; +} + +static int route_name_exists(struct cfl_list *routes, flb_sds_t name) +{ + struct cfl_list *head; + + if (!routes || !name) { + return FLB_FALSE; + } + + cfl_list_foreach(head, routes) { + struct flb_route *route; + + route = cfl_list_entry(head, struct flb_route, _head); + if (route->name && strcmp(route->name, name) == 0) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int parse_route(struct cfl_variant *variant, + struct flb_input_routes *input, + struct flb_config *config, + uint32_t signals) +{ + struct flb_route *route; + struct cfl_kvlist *kvlist; + struct cfl_variant *name_var; + struct cfl_variant *condition_var; + struct cfl_variant *processors_var; + struct cfl_variant *to_var; + struct cfl_variant *outputs_var; + + (void) config; + + if (!variant || variant->type != CFL_VARIANT_KVLIST) { + return -1; + } + + if (signals == 0) { + return -1; + } + + kvlist = variant->data.as_kvlist; + + name_var = cfl_kvlist_fetch(kvlist, \"name\"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + return -1; + } + + route = flb_calloc(1, sizeof(struct flb_route)); + if (!route) { + flb_errno(); + return -1; + } + cfl_list_init(&route->_head); + cfl_list_init(&route->outputs); + cfl_list_init(&route->processors); + route->signals = signals; + + route->name = copy_from_cfl_sds(name_var->data.as_string); + if (!route->name) { + flb_free(route); + return -1; + } + + if (route_name_exists(&input->routes, route->name)) { + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + + + condition_var = cfl_kvlist_fetch(kvlist, \"condition\"); + if (condition_var) { + route->condition = parse_condition(condition_var, route->signals); + if (!route->condition) { + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + } + + processors_var = cfl_kvlist_fetch(kvlist, \"processors\"); + if (processors_var) { + if (parse_processors(processors_var, &route->processors, config) != 0) { + if (route->condition) { + route_condition_destroy(route->condition); + } + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + } + + to_var = cfl_kvlist_fetch(kvlist, \"to\"); + if (!to_var) { + if (route->condition) { + route_condition_destroy(route->condition); + } + route_processors_destroy(&route->processors); + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + + if (to_var->type == CFL_VARIANT_KVLIST) { + outputs_var = cfl_kvlist_fetch(to_var->data.as_kvlist, \"outputs\"); + } + else { + outputs_var = to_var; + } + + if (!outputs_var || parse_outputs(outputs_var, route) != 0 || + cfl_list_is_empty(&route->outputs) == 1) { + if (route->condition) { + route_condition_destroy(route->condition); + } + route_processors_destroy(&route->processors); + route_outputs_destroy(route); + flb_sds_destroy(route->name); + flb_free(route); + return -1; + } + + cfl_list_add(&route->_head, &input->routes); + return 0; +} + +static int parse_routes_block(struct cfl_variant *variant, + struct flb_input_routes *input, + struct flb_config *config, + uint32_t signals) +{ + struct cfl_array *array; + size_t idx; + + if (!variant) { + return -1; + } + + if (variant->type == CFL_VARIANT_ARRAY) { + array = variant->data.as_array; + if (cfl_array_size(array) == 0) { + return -1; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + struct cfl_variant *entry; + + entry = cfl_array_fetch_by_index(array, idx); + if (!entry) { + return -1; + } + + if (parse_route(entry, input, config, signals) != 0) { + return -1; + } + } + + return 0; + } + + if (variant->type == CFL_VARIANT_KVLIST) { + if (parse_route(variant, input, config, signals) != 0) { + return -1; + } + return 0; + } + + return -1; +} + +static int parse_input_section(struct flb_cf_section *section, + struct cfl_list *input_routes, + struct flb_config *config) +{ + struct flb_input_routes *input; + struct cfl_kvlist *kvlist; + struct cfl_variant *name_var; + struct cfl_variant *processors_var; + struct cfl_variant *routes_var; + struct cfl_kvlist *routes_kvlist; + struct cfl_list *head; + struct cfl_kvpair *pair; + uint32_t mask; + size_t before_count; + + if (!section || !input_routes) { + return -1; + } + + kvlist = section->properties; + if (!kvlist) { + return -1; + } + + name_var = cfl_kvlist_fetch(kvlist, "name"); + if (!name_var || name_var->type != CFL_VARIANT_STRING) { + return -1; + } + + input = flb_calloc(1, sizeof(struct flb_input_routes)); + if (!input) { + flb_errno(); + return -1; + } + + cfl_list_init(&input->_head); + cfl_list_init(&input->processors); + cfl_list_init(&input->routes); + + input->input_name = copy_from_cfl_sds(name_var->data.as_string); + if (!input->input_name) { + flb_free(input); + return -1; + } + + processors_var = cfl_kvlist_fetch(kvlist, "processors"); + if (processors_var) { + if (parse_processors(processors_var, &input->processors, config) != 0) { + goto error; + } + } + + routes_var = cfl_kvlist_fetch(kvlist, "routes"); + if (!routes_var || routes_var->type != CFL_VARIANT_KVLIST) { + goto error; + } + + routes_kvlist = routes_var->data.as_kvlist; + if (cfl_list_is_empty(&routes_kvlist->list) == 1) { + goto error; + } + + cfl_list_foreach(head, &routes_kvlist->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + if (!pair || !pair->key) { + goto error; + } + + mask = parse_signal_key(pair->key); + if (mask == 0) { + goto error; + } + + if (!pair->val) { + goto error; + } + + before_count = cfl_list_size(&input->routes); + if (parse_routes_block(pair->val, input, config, mask) != 0 || + cfl_list_size(&input->routes) == before_count) { + goto error; + } + } + + if (cfl_list_is_empty(&input->routes) == 1) { + goto error; + } + + cfl_list_add(&input->_head, input_routes); + return 0; + +error: + flb_router_routes_destroy(&input->routes); + route_processors_destroy(&input->processors); + if (input->input_name) { + flb_sds_destroy(input->input_name); + } + flb_free(input); + return -1; +} + +int flb_router_config_parse(struct flb_cf *cf, + struct cfl_list *input_routes, + struct flb_config *config) +{ + struct mk_list *head; + struct flb_cf_section *section; + + if (!cf || !input_routes) { + return -1; + } + + cfl_list_init(input_routes); + + mk_list_foreach(head, &cf->inputs) { + section = mk_list_entry(head, struct flb_cf_section, _head_section); + if (parse_input_section(section, input_routes, config) != 0) { + flb_router_routes_destroy(input_routes); + cfl_list_init(input_routes); + return -1; + } + } + + if (cfl_list_is_empty(input_routes) == 1) { + flb_router_routes_destroy(input_routes); + cfl_list_init(input_routes); + return -1; + } + + return 0; +} + diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index a270f1dc3c8..0c8e984ed8f 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -13,6 +13,7 @@ set(UNIT_TESTS_FILES kv.c slist.c router.c + router_config.c network.c unit_sizes.c hashtable.c diff --git a/tests/internal/data/config_format/yaml/routing/basic.yaml b/tests/internal/data/config_format/yaml/routing/basic.yaml new file mode 100644 index 00000000000..a7597565e2e --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/basic.yaml @@ -0,0 +1,38 @@ +--- +pipeline: + inputs: + - name: opentelemetry + processors: + - name: parser + parser: json + routes: + logs: + - name: error_logs + condition: + rules: + - field: "$level" + op: eq + value: "error" + to: + outputs: + - name: loki + fallback: s3_backup + - name: default + condition: + default: true + to: + outputs: + - name: elasticsearch + metrics: + - name: metrics_above_threshold + condition: + rules: + - field: "$metric.name" + op: regex + value: "^cpu_" + - field: "$metric.value" + op: gt + value: 0.9 + to: + outputs: + - name: prometheus_remote diff --git a/tests/internal/data/config_format/yaml/routing/metrics.yaml b/tests/internal/data/config_format/yaml/routing/metrics.yaml new file mode 100644 index 00000000000..7aaa6e2030d --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/metrics.yaml @@ -0,0 +1,19 @@ +--- +pipeline: + inputs: + - name: metrics + routes: + metrics: + - name: cpu_hot + condition: + rules: + - field: "$metric.name" + op: eq + value: "cpu_usage" + - field: "$resource['service.name']" + op: eq + value: "checkout" + to: + outputs: + - name: prometheus_remote + fallback: s3_backup diff --git a/tests/internal/data/config_format/yaml/routing/multi_signal.yaml b/tests/internal/data/config_format/yaml/routing/multi_signal.yaml new file mode 100644 index 00000000000..46553e9e6e2 --- /dev/null +++ b/tests/internal/data/config_format/yaml/routing/multi_signal.yaml @@ -0,0 +1,27 @@ +--- +pipeline: + inputs: + - name: telemetry + routes: + "logs,traces": + - name: service_checkout + condition: + rules: + - field: "$resource['service.name']" + op: eq + value: "checkout" + - field: "$span.duration_ms" + op: gt + value: 2000 + to: + outputs: + - name: tempo + logs: + - name: catch_all + condition: + default: true + to: + outputs: + - name: loki + - name: s3_archive + fallback: glacier diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c new file mode 100644 index 00000000000..845cfb5e504 --- /dev/null +++ b/tests/internal/router_config.c @@ -0,0 +1,648 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "flb_tests_internal.h" + +#ifdef _WIN32 +#define FLB_ROUTER_TEST_FILE(name) \ + FLB_TESTS_DATA_PATH "\\data\\config_format\\yaml\\routing\\" name +#else +#define FLB_ROUTER_TEST_FILE(name) \ + FLB_TESTS_DATA_PATH "/data/config_format/yaml/routing/" name +#endif + +static struct cfl_variant *clone_variant(struct cfl_variant *var); + +static struct cfl_array *clone_array(struct cfl_array *array) +{ + struct cfl_array *copy; + struct cfl_variant *entry; + struct cfl_variant *entry_copy; + size_t idx; + + if (!array) { + return NULL; + } + + copy = cfl_array_create(cfl_array_size(array)); + if (!copy) { + return NULL; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + entry_copy = clone_variant(entry); + if (!entry_copy) { + cfl_array_destroy(copy); + return NULL; + } + + if (cfl_array_append(copy, entry_copy) != 0) { + cfl_variant_destroy(entry_copy); + cfl_array_destroy(copy); + return NULL; + } + } + + return copy; +} + +static struct cfl_kvlist *clone_kvlist(struct cfl_kvlist *kvlist) +{ + struct cfl_kvlist *copy; + struct cfl_list *head; + struct cfl_kvpair *pair; + struct cfl_variant *value_copy; + + if (!kvlist) { + return NULL; + } + + copy = cfl_kvlist_create(); + if (!copy) { + return NULL; + } + + cfl_list_foreach(head, &kvlist->list) { + pair = cfl_list_entry(head, struct cfl_kvpair, _head); + value_copy = clone_variant(pair->val); + if (!value_copy) { + cfl_kvlist_destroy(copy); + return NULL; + } + + if (cfl_kvlist_insert_s(copy, + pair->key, + cfl_sds_len(pair->key), + value_copy) != 0) { + cfl_variant_destroy(value_copy); + cfl_kvlist_destroy(copy); + return NULL; + } + } + + return copy; +} + +static struct cfl_variant *clone_variant(struct cfl_variant *var) +{ + if (!var) { + return NULL; + } + + switch (var->type) { + case CFL_VARIANT_STRING: + return cfl_variant_create_from_string_s(var->data.as_string, + cfl_sds_len(var->data.as_string), + CFL_FALSE); + case CFL_VARIANT_BOOL: + return cfl_variant_create_from_bool(var->data.as_bool); + case CFL_VARIANT_INT: + return cfl_variant_create_from_int64(var->data.as_int64); + case CFL_VARIANT_UINT: + return cfl_variant_create_from_uint64(var->data.as_uint64); + case CFL_VARIANT_DOUBLE: + return cfl_variant_create_from_double(var->data.as_double); + case CFL_VARIANT_NULL: + return cfl_variant_create_from_null(); + case CFL_VARIANT_ARRAY: { + struct cfl_array *copy = clone_array(var->data.as_array); + if (!copy) { + return NULL; + } + return cfl_variant_create_from_array(copy); + } + case CFL_VARIANT_KVLIST: { + struct cfl_kvlist *copy = clone_kvlist(var->data.as_kvlist); + if (!copy) { + return NULL; + } + return cfl_variant_create_from_kvlist(copy); + } + default: + break; + } + + return NULL; +} + +static struct flb_cf *cf_from_inputs_variant(struct cfl_variant *inputs) +{ + struct flb_cf *cf; + struct cfl_array *array; + size_t idx; + + if (!inputs || inputs->type != CFL_VARIANT_ARRAY) { + return NULL; + } + + cf = flb_cf_create(); + if (!cf) { + return NULL; + } + + array = inputs->data.as_array; + for (idx = 0; idx < cfl_array_size(array); idx++) { + struct cfl_variant *entry; + struct cfl_kvlist *copy; + struct flb_cf_section *section; + + entry = cfl_array_fetch_by_index(array, idx); + if (!entry || entry->type != CFL_VARIANT_KVLIST) { + flb_cf_destroy(cf); + return NULL; + } + + copy = clone_kvlist(entry->data.as_kvlist); + if (!copy) { + flb_cf_destroy(cf); + return NULL; + } + + section = flb_cf_section_create(cf, "input", 5); + if (!section) { + cfl_kvlist_destroy(copy); + flb_cf_destroy(cf); + return NULL; + } + + cfl_kvlist_destroy(section->properties); + section->properties = copy; + } + + return cf; +} + +static struct flb_cf *load_cf_from_yaml(const char *path) +{ + return flb_cf_yaml_create(NULL, (char *) path, NULL, 0); +} + +static struct cfl_variant *create_inputs_variant() +{ + struct cfl_array *inputs; + struct cfl_kvlist *input; + struct cfl_array *processors; + struct cfl_kvlist *proc; + struct cfl_kvlist *routes; + struct cfl_array *log_routes; + struct cfl_kvlist *route; + struct cfl_kvlist *condition; + struct cfl_array *rules; + struct cfl_kvlist *rule_kv; + struct cfl_variant *rule_variant; + struct cfl_array *outputs; + struct cfl_kvlist *to; + struct cfl_kvlist *output_obj; + struct cfl_kvlist *default_condition; + struct cfl_variant *inputs_variant; + + inputs = cfl_array_create(1); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return NULL; + } + + input = cfl_kvlist_create(); + TEST_CHECK(input != NULL); + if (!input) { + cfl_array_destroy(inputs); + return NULL; + } + + TEST_CHECK(cfl_kvlist_insert_string(input, "name", "opentelemetry") == 0); + + processors = cfl_array_create(1); + TEST_CHECK(processors != NULL); + proc = cfl_kvlist_create(); + TEST_CHECK(proc != NULL); + TEST_CHECK(cfl_kvlist_insert_string(proc, "name", "parser") == 0); + TEST_CHECK(cfl_kvlist_insert_string(proc, "parser", "json") == 0); + TEST_CHECK(cfl_array_append(processors, cfl_variant_create_from_kvlist(proc)) == 0); + TEST_CHECK(cfl_kvlist_insert_array(input, "processors", processors) == 0); + + routes = cfl_kvlist_create(); + TEST_CHECK(routes != NULL); + log_routes = cfl_array_create(2); + TEST_CHECK(log_routes != NULL); + + /* error_logs route */ + route = cfl_kvlist_create(); + TEST_CHECK(route != NULL); + TEST_CHECK(cfl_kvlist_insert_string(route, "name", "error_logs") == 0); + + condition = cfl_kvlist_create(); + TEST_CHECK(condition != NULL); + rules = cfl_array_create(1); + TEST_CHECK(rules != NULL); + + rule_kv = cfl_kvlist_create(); + TEST_CHECK(rule_kv != NULL); + TEST_CHECK(cfl_kvlist_insert_string(rule_kv, "field", "$level") == 0); + TEST_CHECK(cfl_kvlist_insert_string(rule_kv, "op", "eq") == 0); + TEST_CHECK(cfl_kvlist_insert_string(rule_kv, "value", "error") == 0); + rule_variant = cfl_variant_create_from_kvlist(rule_kv); + TEST_CHECK(rule_variant != NULL); + TEST_CHECK(cfl_array_append(rules, rule_variant) == 0); + + TEST_CHECK(cfl_kvlist_insert_array(condition, "rules", rules) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "condition", condition) == 0); + + outputs = cfl_array_create(1); + TEST_CHECK(outputs != NULL); + TEST_CHECK(cfl_array_append_string(outputs, "loki") == 0); + to = cfl_kvlist_create(); + TEST_CHECK(to != NULL); + TEST_CHECK(cfl_kvlist_insert_array(to, "outputs", outputs) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "to", to) == 0); + + TEST_CHECK(cfl_array_append(log_routes, cfl_variant_create_from_kvlist(route)) == 0); + + /* default route */ + route = cfl_kvlist_create(); + TEST_CHECK(route != NULL); + TEST_CHECK(cfl_kvlist_insert_string(route, "name", "default") == 0); + + default_condition = cfl_kvlist_create(); + TEST_CHECK(default_condition != NULL); + TEST_CHECK(cfl_kvlist_insert_bool(default_condition, "default", 1) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "condition", default_condition) == 0); + + outputs = cfl_array_create(1); + TEST_CHECK(outputs != NULL); + output_obj = cfl_kvlist_create(); + TEST_CHECK(output_obj != NULL); + TEST_CHECK(cfl_kvlist_insert_string(output_obj, "name", "elasticsearch") == 0); + TEST_CHECK(cfl_kvlist_insert_string(output_obj, "fallback", "s3_backup") == 0); + TEST_CHECK(cfl_array_append(outputs, cfl_variant_create_from_kvlist(output_obj)) == 0); + + to = cfl_kvlist_create(); + TEST_CHECK(to != NULL); + TEST_CHECK(cfl_kvlist_insert_array(to, "outputs", outputs) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "to", to) == 0); + + TEST_CHECK(cfl_array_append(log_routes, cfl_variant_create_from_kvlist(route)) == 0); + + TEST_CHECK(cfl_kvlist_insert_array(routes, "logs", log_routes) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(input, "routes", routes) == 0); + TEST_CHECK(cfl_array_append(inputs, cfl_variant_create_from_kvlist(input)) == 0); + + inputs_variant = cfl_variant_create_from_array(inputs); + TEST_CHECK(inputs_variant != NULL); + + return inputs_variant; +} +static struct cfl_variant *create_duplicate_route_inputs() +{ + struct cfl_array *inputs; + struct cfl_kvlist *input; + struct cfl_kvlist *routes; + struct cfl_array *log_routes; + struct cfl_kvlist *route; + struct cfl_kvlist *condition; + struct cfl_kvlist *to; + struct cfl_array *outputs; + int idx; + + inputs = cfl_array_create(1); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return NULL; + } + + input = cfl_kvlist_create(); + TEST_CHECK(input != NULL); + if (!input) { + cfl_array_destroy(inputs); + return NULL; + } + + TEST_CHECK(cfl_kvlist_insert_string(input, "name", "duplicate") == 0); + + routes = cfl_kvlist_create(); + TEST_CHECK(routes != NULL); + log_routes = cfl_array_create(2); + TEST_CHECK(log_routes != NULL); + + for (idx = 0; idx < 2; idx++) { + route = cfl_kvlist_create(); + TEST_CHECK(route != NULL); + TEST_CHECK(cfl_kvlist_insert_string(route, "name", "dup") == 0); + + condition = cfl_kvlist_create(); + TEST_CHECK(condition != NULL); + TEST_CHECK(cfl_kvlist_insert_bool(condition, "default", 1) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "condition", condition) == 0); + + outputs = cfl_array_create(1); + TEST_CHECK(outputs != NULL); + if (idx == 0) { + TEST_CHECK(cfl_array_append_string(outputs, "primary") == 0); + } + else { + TEST_CHECK(cfl_array_append_string(outputs, "secondary") == 0); + } + + to = cfl_kvlist_create(); + TEST_CHECK(to != NULL); + TEST_CHECK(cfl_kvlist_insert_array(to, "outputs", outputs) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(route, "to", to) == 0); + + TEST_CHECK(cfl_array_append(log_routes, cfl_variant_create_from_kvlist(route)) == 0); + } + + TEST_CHECK(cfl_kvlist_insert_array(routes, "logs", log_routes) == 0); + TEST_CHECK(cfl_kvlist_insert_kvlist(input, "routes", routes) == 0); + TEST_CHECK(cfl_array_append(inputs, cfl_variant_create_from_kvlist(input)) == 0); + + return cfl_variant_create_from_array(inputs); +} +void test_router_config_parse_basic() +{ + struct cfl_list routes; + struct cfl_variant *inputs; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *first_route; + struct flb_route *second_route; + struct flb_route_output *output; + struct cfl_list *head; + struct cfl_list *route_head; + int ret; + + cfl_list_init(&routes); + + inputs = create_inputs_variant(); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return; + } + + cf = cf_from_inputs_variant(inputs); + TEST_CHECK(cf != NULL); + if (!cf) { + cfl_variant_destroy(inputs); + return; + } + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(cfl_list_size(&routes) == 1); + head = routes.next; + input_routes = cfl_list_entry(head, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "opentelemetry") == 0); + TEST_CHECK(cfl_list_size(&input_routes->processors) == 1); + TEST_CHECK(cfl_list_size(&input_routes->routes) == 2); + + route_head = input_routes->routes.next; + first_route = cfl_list_entry(route_head, struct flb_route, _head); + TEST_CHECK(strcmp(first_route->name, "error_logs") == 0); + TEST_CHECK(first_route->signals == FLB_ROUTER_SIGNAL_LOGS); + TEST_CHECK(first_route->condition != NULL); + TEST_CHECK(cfl_list_size(&first_route->outputs) == 1); + output = cfl_list_entry(first_route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "loki") == 0); + + route_head = route_head->next; + second_route = cfl_list_entry(route_head, struct flb_route, _head); + TEST_CHECK(second_route->condition != NULL); + TEST_CHECK(second_route->condition->is_default == FLB_TRUE); + TEST_CHECK(cfl_list_size(&second_route->outputs) == 1); + TEST_CHECK(second_route->signals == FLB_ROUTER_SIGNAL_LOGS); + output = cfl_list_entry(second_route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->fallback, "s3_backup") == 0); + + flb_router_routes_destroy(&routes); + } + + if (ret != 0) { + cfl_list_init(&routes); + } + + flb_cf_destroy(cf); + cfl_variant_destroy(inputs); +} + +void test_router_config_duplicate_route() +{ + struct cfl_list routes; + struct cfl_variant *inputs; + struct flb_cf *cf; + int ret; + + cfl_list_init(&routes); + + inputs = create_duplicate_route_inputs(); + TEST_CHECK(inputs != NULL); + if (!inputs) { + return; + } + + cf = cf_from_inputs_variant(inputs); + TEST_CHECK(cf != NULL); + if (!cf) { + cfl_variant_destroy(inputs); + return; + } + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret != 0); + TEST_CHECK(cfl_list_is_empty(&routes) == 1); + + flb_cf_destroy(cf); + cfl_variant_destroy(inputs); +} + +void test_router_config_parse_file_basic() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct cfl_list *routes_head; + struct cfl_list *route_head; + struct flb_route *route; + struct flb_route_output *output; + int seen_error; + int seen_metrics; + int seen_default; + int ret; + + cf = load_cf_from_yaml(FLB_ROUTER_TEST_FILE("basic.yaml")); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + seen_error = FLB_FALSE; + seen_metrics = FLB_FALSE; + seen_default = FLB_FALSE; + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + routes_head = routes.next; + input_routes = cfl_list_entry(routes_head, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "opentelemetry") == 0); + TEST_CHECK(cfl_list_size(&input_routes->routes) == 3); + + cfl_list_foreach(route_head, &input_routes->routes) { + route = cfl_list_entry(route_head, struct flb_route, _head); + if (strcmp(route->name, "error_logs") == 0) { + seen_error = FLB_TRUE; + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_LOGS); + output = cfl_list_entry(route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "loki") == 0); + TEST_CHECK(strcmp(output->fallback, "s3_backup") == 0); + } + else if (strcmp(route->name, "metrics_above_threshold") == 0) { + seen_metrics = FLB_TRUE; + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_METRICS); + } + else if (strcmp(route->name, "default") == 0) { + seen_default = FLB_TRUE; + TEST_CHECK(route->condition != NULL); + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_LOGS); + TEST_CHECK(route->condition->is_default == FLB_TRUE); + output = cfl_list_entry(route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "elasticsearch") == 0); + } + } + + TEST_CHECK(seen_error == FLB_TRUE); + TEST_CHECK(seen_metrics == FLB_TRUE); + TEST_CHECK(seen_default == FLB_TRUE); + + flb_router_routes_destroy(&routes); + } + + flb_cf_destroy(cf); +} + +void test_router_config_parse_file_multi_signal() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *output; + struct cfl_list *route_head; + struct cfl_list *output_head; + int seen_multi; + int seen_default; + int outputs; + int ret; + + cf = load_cf_from_yaml(FLB_ROUTER_TEST_FILE("multi_signal.yaml")); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + seen_multi = FLB_FALSE; + seen_default = FLB_FALSE; + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + input_routes = cfl_list_entry(routes.next, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "telemetry") == 0); + + cfl_list_foreach(route_head, &input_routes->routes) { + route = cfl_list_entry(route_head, struct flb_route, _head); + if (strcmp(route->name, "service_checkout") == 0) { + seen_multi = FLB_TRUE; + TEST_CHECK(route->signals == + (FLB_ROUTER_SIGNAL_LOGS | FLB_ROUTER_SIGNAL_TRACES)); + } + else if (strcmp(route->name, "catch_all") == 0) { + seen_default = FLB_TRUE; + TEST_CHECK(route->condition != NULL && + route->condition->is_default == FLB_TRUE); + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_LOGS); + + outputs = 0; + cfl_list_foreach(output_head, &route->outputs) { + output = cfl_list_entry(output_head, struct flb_route_output, _head); + outputs++; + if (strcmp(output->name, "s3_archive") == 0) { + TEST_CHECK(strcmp(output->fallback, "glacier") == 0); + } + } + TEST_CHECK(outputs == 2); + } + } + + TEST_CHECK(seen_multi == FLB_TRUE); + TEST_CHECK(seen_default == FLB_TRUE); + + flb_router_routes_destroy(&routes); + } + + flb_cf_destroy(cf); +} + +void test_router_config_parse_file_metrics() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *output; + int ret; + + cf = load_cf_from_yaml(FLB_ROUTER_TEST_FILE("metrics.yaml")); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret == 0) { + input_routes = cfl_list_entry(routes.next, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "metrics") == 0); + TEST_CHECK(cfl_list_size(&input_routes->routes) == 1); + + route = cfl_list_entry(input_routes->routes.next, + struct flb_route, _head); + TEST_CHECK(strcmp(route->name, "cpu_hot") == 0); + TEST_CHECK(route->signals == FLB_ROUTER_SIGNAL_METRICS); + + output = cfl_list_entry(route->outputs.next, + struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "prometheus_remote") == 0); + TEST_CHECK(strcmp(output->fallback, "s3_backup") == 0); + + flb_router_routes_destroy(&routes); + } + + flb_cf_destroy(cf); +} + +TEST_LIST = { + { "parse_basic", test_router_config_parse_basic }, + { "duplicate_route", test_router_config_duplicate_route }, + { "parse_basic_file", test_router_config_parse_file_basic }, + { "parse_multi_signal_file", test_router_config_parse_file_multi_signal }, + { "parse_metrics_file", test_router_config_parse_file_metrics }, + { 0 } +}; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index 01838c68d27..f5258e5ea0c 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -201,6 +201,7 @@ if(FLB_IN_LIB) FLB_RT_TEST(FLB_OUT_LIB "core_engine.c") FLB_RT_TEST(FLB_OUT_LIB "core_log.c") FLB_RT_TEST(FLB_OUT_LIB "core_routes.c") + FLB_RT_TEST(FLB_OUT_LIB "router_precedence.c") FLB_RT_TEST(FLB_OUT_LIB "config_map_opts.c") FLB_RT_TEST(FLB_OUT_COUNTER "out_counter.c") FLB_RT_TEST(FLB_OUT_AZURE_KUSTO "out_azure_kusto.c") diff --git a/tests/runtime/data/router/precedence.yaml b/tests/runtime/data/router/precedence.yaml new file mode 100644 index 00000000000..b8df272fcc8 --- /dev/null +++ b/tests/runtime/data/router/precedence.yaml @@ -0,0 +1,15 @@ +--- +pipeline: + inputs: + - name: lib + routes: + logs: + - name: default_route + condition: + default: true + to: + outputs: + - name: lib_route + outputs: + - name: lib_route + type: lib diff --git a/tests/runtime/router_precedence.c b/tests/runtime/router_precedence.c new file mode 100644 index 00000000000..d886fd7421b --- /dev/null +++ b/tests/runtime/router_precedence.c @@ -0,0 +1,86 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include "flb_tests_runtime.h" + +static const char *router_yaml_path = +#ifdef _WIN32 + FLB_TESTS_DATA_PATH "\\data\\router\\precedence.yaml"; +#else + FLB_TESTS_DATA_PATH "/data/router/precedence.yaml"; +#endif + +void flb_test_route_default_precedence() +{ + struct cfl_list routes; + struct flb_cf *cf; + struct flb_input_routes *input_routes; + struct flb_route *route; + struct flb_route_output *output; + struct flb_event_chunk chunk; + int ret; + int match; + + cf = flb_cf_yaml_create(NULL, (char *) router_yaml_path, NULL, 0); + TEST_CHECK(cf != NULL); + if (!cf) { + return; + } + + cfl_list_init(&routes); + + ret = flb_router_config_parse(cf, &routes, NULL); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_cf_destroy(cf); + return; + } + + input_routes = cfl_list_entry(routes.next, struct flb_input_routes, _head); + TEST_CHECK(strcmp(input_routes->input_name, "lib") == 0); + + route = cfl_list_entry(input_routes->routes.next, struct flb_route, _head); + TEST_CHECK(route->condition != NULL); + TEST_CHECK(route->condition->is_default == FLB_TRUE); + + memset(&chunk, 0, sizeof(chunk)); + chunk.type = FLB_EVENT_TYPE_LOGS; + + TEST_CHECK(flb_route_condition_eval(&chunk, route) == FLB_TRUE); + + output = cfl_list_entry(route->outputs.next, struct flb_route_output, _head); + TEST_CHECK(strcmp(output->name, "lib_route") == 0); + + match = flb_router_match("lib.input", strlen("lib.input"), "does-not-match", NULL); + TEST_CHECK(match == FLB_FALSE); + + flb_router_routes_destroy(&routes); + flb_cf_destroy(cf); +} + +TEST_LIST = { + {"route_default_precedence", flb_test_route_default_precedence}, + {0} +};