From 33cdbbe3741ac2aaca4ce394f5f1eb561d6deae5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 12:26:08 -0600 Subject: [PATCH 01/30] router: implement new router paths and conditions Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 9 +++++++++ src/flb_router.c | 2 ++ 2 files changed, 11 insertions(+) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index a8e4f2dddab..e7c0ba92610 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -26,11 +26,13 @@ #include #include #include +#include #include #include struct flb_router_path { struct flb_output_instance *ins; + struct flb_route *route; struct mk_list _head; }; @@ -74,12 +76,17 @@ struct flb_route_condition_rule { flb_sds_t field; flb_sds_t op; flb_sds_t value; + flb_sds_t *values; + size_t values_count; struct cfl_list _head; }; struct flb_route_condition { struct cfl_list rules; int is_default; + enum flb_condition_operator op; + struct flb_condition *compiled; + int compiled_status; }; struct flb_route_output { @@ -136,6 +143,8 @@ int flb_condition_eval_metrics(struct flb_event_chunk *chunk, struct flb_route *route); int flb_condition_eval_traces(struct flb_event_chunk *chunk, struct flb_route *route); +int flb_router_path_should_route(struct flb_event_chunk *chunk, + struct flb_router_path *path); struct flb_cf; diff --git a/src/flb_router.c b/src/flb_router.c index 9312f5ffed0..0c8f749d835 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -155,6 +155,7 @@ int flb_router_connect(struct flb_input_instance *in, } p->ins = out; + p->route = NULL; mk_list_add(&p->_head, &in->routes); return 0; @@ -172,6 +173,7 @@ int flb_router_connect_direct(struct flb_input_instance *in, } p->ins = out; + p->route = NULL; mk_list_add(&p->_head, &in->routes_direct); return 0; From ac42f6770c85d2033f92aaf3fc945c3c3f366b96 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 12:27:05 -0600 Subject: [PATCH 02/30] router_config: add support for rules Signed-off-by: Eduardo Silva --- src/flb_router_config.c | 96 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 2 deletions(-) diff --git a/src/flb_router_config.c b/src/flb_router_config.c index 0513a11afd2..2a3a50771f9 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -276,10 +277,24 @@ static void route_condition_destroy(struct flb_route_condition *condition) if (rule->value) { flb_sds_destroy(rule->value); } + if (rule->values) { + size_t idx; + + for (idx = 0; idx < rule->values_count; idx++) { + if (rule->values[idx]) { + flb_sds_destroy(rule->values[idx]); + } + } + flb_free(rule->values); + } flb_free(rule); } + if (condition->compiled) { + flb_condition_destroy(condition->compiled); + } + flb_free(condition); } @@ -530,6 +545,8 @@ static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant return NULL; } cfl_list_init(&rule->_head); + rule->values = NULL; + rule->values_count = 0; rule->field = copy_from_cfl_sds(field_var->data.as_string); if (!rule->field) { @@ -544,9 +561,56 @@ static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant return NULL; } - if (value_var) { + if (!value_var) { + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + if (value_var->type == CFL_VARIANT_ARRAY) { + struct cfl_array *array; + struct cfl_variant *entry; + size_t idx; + + array = value_var->data.as_array; + if (!array || cfl_array_size(array) == 0) { + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + rule->values = flb_calloc(cfl_array_size(array), sizeof(flb_sds_t)); + if (!rule->values) { + flb_errno(); + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + rule->values[idx] = variant_to_sds(entry); + if (!rule->values[idx]) { + size_t j; + + for (j = 0; j < idx; j++) { + flb_sds_destroy(rule->values[j]); + } + flb_free(rule->values); + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + } + rule->values_count = cfl_array_size(array); + } + else { rule->value = variant_to_sds(value_var); - if (!rule->value && strcmp(rule->op, "exists") != 0) { + if (!rule->value) { flb_sds_destroy(rule->op); flb_sds_destroy(rule->field); flb_free(rule); @@ -563,6 +627,7 @@ static struct flb_route_condition *parse_condition(struct cfl_variant *variant, struct flb_route_condition *condition; struct cfl_variant *rules_var; struct cfl_variant *default_var; + struct cfl_variant *op_var; struct cfl_array *rules_array; struct cfl_variant *entry; struct flb_route_condition_rule *rule; @@ -579,9 +644,31 @@ static struct flb_route_condition *parse_condition(struct cfl_variant *variant, return NULL; } cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled = NULL; + condition->compiled_status = 0; rules_var = cfl_kvlist_fetch(variant->data.as_kvlist, "rules"); default_var = cfl_kvlist_fetch(variant->data.as_kvlist, "default"); + op_var = cfl_kvlist_fetch(variant->data.as_kvlist, "op"); + + if (op_var) { + if (op_var->type != CFL_VARIANT_STRING) { + route_condition_destroy(condition); + return NULL; + } + + if (strcasecmp(op_var->data.as_string, "and") == 0) { + condition->op = FLB_COND_OP_AND; + } + else if (strcasecmp(op_var->data.as_string, "or") == 0) { + condition->op = FLB_COND_OP_OR; + } + else { + route_condition_destroy(condition); + return NULL; + } + } if (default_var) { if (variant_to_bool(default_var, &val) != 0) { @@ -1220,6 +1307,11 @@ int flb_router_apply_config(struct flb_config *config) } if (flb_router_connect_direct(input_ins, output_ins) == 0) { + struct flb_router_path *path; + + path = mk_list_entry_last(&input_ins->routes_direct, + struct flb_router_path, _head); + path->route = route; created++; flb_debug("[router] connected input '%s' route '%s' to output '%s'", flb_input_name(input_ins), From 839a3979f2e0bc5765445ad5290adddc629b1b48 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 12:29:33 -0600 Subject: [PATCH 03/30] router_condition: add conditional logic for logs Signed-off-by: Eduardo Silva --- src/flb_router_condition.c | 287 ++++++++++++++++++++++++++++++++++--- 1 file changed, 270 insertions(+), 17 deletions(-) diff --git a/src/flb_router_condition.c b/src/flb_router_condition.c index 318227a094a..fa1ad515954 100644 --- a/src/flb_router_condition.c +++ b/src/flb_router_condition.c @@ -20,6 +20,16 @@ #include #include #include +#include +#include +#include +#include + +#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1 +#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1 + +static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition); +static void route_condition_record_destroy(struct flb_mp_chunk_record *record); uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) { @@ -44,18 +54,68 @@ uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) int flb_condition_eval_logs(struct flb_event_chunk *chunk, struct flb_route *route) { - (void) chunk; - (void) route; + int ret; + int result = FLB_FALSE; + struct flb_route_condition *condition; + struct flb_condition *compiled; + struct flb_log_event_decoder decoder; + struct flb_log_event event; + struct flb_mp_chunk_record record; - /* - * The full condition evaluation engine requires field resolvers that map - * record accessors to the different telemetry payload shapes. The wiring - * of those resolvers is part of a bigger effort and will be implemented in - * follow-up changes. For the time being we simply report that the - * condition failed so that the runtime can rely on explicit default - * routes. - */ - return FLB_FALSE; + if (!chunk || !route || !route->condition) { + return FLB_FALSE; + } + + if (!chunk->data || chunk->size == 0) { + return FLB_FALSE; + } + + condition = route->condition; + + compiled = route_condition_get_compiled(condition); + if (!compiled) { + return FLB_FALSE; + } + + ret = flb_log_event_decoder_init(&decoder, chunk->data, chunk->size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + return FLB_FALSE; + } + + flb_log_event_decoder_read_groups(&decoder, FLB_TRUE); + + while ((ret = flb_log_event_decoder_next(&decoder, &event)) == FLB_EVENT_DECODER_SUCCESS) { + memset(&record, 0, sizeof(record)); + record.event = event; + + if (event.metadata) { + record.cobj_metadata = flb_mp_object_to_cfl(event.metadata); + if (!record.cobj_metadata) { + route_condition_record_destroy(&record); + break; + } + } + + if (event.body) { + record.cobj_record = flb_mp_object_to_cfl(event.body); + if (!record.cobj_record) { + route_condition_record_destroy(&record); + break; + } + } + + if (flb_condition_evaluate(compiled, &record) == FLB_TRUE) { + result = FLB_TRUE; + route_condition_record_destroy(&record); + break; + } + + route_condition_record_destroy(&record); + } + + flb_log_event_decoder_destroy(&decoder); + + return result; } int flb_condition_eval_metrics(struct flb_event_chunk *chunk, @@ -94,8 +154,7 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, return FLB_FALSE; } - if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && - ((route->signals & signal) == 0)) { + if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) { return FLB_FALSE; } @@ -103,10 +162,6 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, return FLB_TRUE; } - if (cfl_list_is_empty(&route->condition->rules) == 0) { - return FLB_TRUE; - } - switch (signal) { case FLB_ROUTER_SIGNAL_LOGS: return flb_condition_eval_logs(chunk, route); @@ -121,3 +176,201 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, return FLB_FALSE; } +int flb_router_path_should_route(struct flb_event_chunk *chunk, + struct flb_router_path *path) +{ + if (!path) { + return FLB_FALSE; + } + + if (!path->route) { + return FLB_TRUE; + } + + return flb_route_condition_eval(chunk, path->route); +} + +static int parse_rule_operator(const flb_sds_t op_str, + enum flb_rule_operator *out) +{ + if (!op_str || !out) { + return -1; + } + + if (strcasecmp(op_str, "eq") == 0) { + *out = FLB_RULE_OP_EQ; + } + else if (strcasecmp(op_str, "neq") == 0) { + *out = FLB_RULE_OP_NEQ; + } + else if (strcasecmp(op_str, "gt") == 0) { + *out = FLB_RULE_OP_GT; + } + else if (strcasecmp(op_str, "lt") == 0) { + *out = FLB_RULE_OP_LT; + } + else if (strcasecmp(op_str, "gte") == 0) { + *out = FLB_RULE_OP_GTE; + } + else if (strcasecmp(op_str, "lte") == 0) { + *out = FLB_RULE_OP_LTE; + } + else if (strcasecmp(op_str, "regex") == 0) { + *out = FLB_RULE_OP_REGEX; + } + else if (strcasecmp(op_str, "not_regex") == 0) { + *out = FLB_RULE_OP_NOT_REGEX; + } + else if (strcasecmp(op_str, "in") == 0) { + *out = FLB_RULE_OP_IN; + } + else if (strcasecmp(op_str, "not_in") == 0) { + *out = FLB_RULE_OP_NOT_IN; + } + else { + return -1; + } + + return 0; +} + +static int parse_numeric_value(flb_sds_t value, double *out) +{ + char *endptr = NULL; + double result; + + if (!value || !out) { + return -1; + } + + errno = 0; + result = strtod(value, &endptr); + if (errno == ERANGE || endptr == value || (endptr && *endptr != '\0')) { + return -1; + } + + *out = result; + return 0; +} + +static struct flb_condition *route_condition_compile(struct flb_route_condition *condition) +{ + int ret; + double numeric_value; + enum flb_rule_operator op; + struct cfl_list *head; + struct flb_condition *compiled; + struct flb_route_condition_rule *rule; + + compiled = flb_condition_create(condition->op); + if (!compiled) { + return NULL; + } + + cfl_list_foreach(head, &condition->rules) { + rule = cfl_list_entry(head, struct flb_route_condition_rule, _head); + + if (!rule->field || !rule->op) { + flb_condition_destroy(compiled); + return NULL; + } + + if (parse_rule_operator(rule->op, &op) != 0) { + flb_condition_destroy(compiled); + return NULL; + } + + switch (op) { + case FLB_RULE_OP_EQ: + case FLB_RULE_OP_NEQ: + case FLB_RULE_OP_REGEX: + case FLB_RULE_OP_NOT_REGEX: + if (!rule->value) { + flb_condition_destroy(compiled); + return NULL; + } + ret = flb_condition_add_rule(compiled, rule->field, op, + rule->value, 1, RECORD_CONTEXT_BODY); + break; + case FLB_RULE_OP_GT: + case FLB_RULE_OP_LT: + case FLB_RULE_OP_GTE: + case FLB_RULE_OP_LTE: + if (!rule->value) { + flb_condition_destroy(compiled); + return NULL; + } + if (parse_numeric_value(rule->value, &numeric_value) != 0) { + flb_condition_destroy(compiled); + return NULL; + } + ret = flb_condition_add_rule(compiled, rule->field, op, + &numeric_value, 1, RECORD_CONTEXT_BODY); + break; + case FLB_RULE_OP_IN: + case FLB_RULE_OP_NOT_IN: + if (!rule->values || rule->values_count == 0) { + flb_condition_destroy(compiled); + return NULL; + } + ret = flb_condition_add_rule(compiled, rule->field, op, + rule->values, + (int) rule->values_count, + RECORD_CONTEXT_BODY); + break; + default: + flb_condition_destroy(compiled); + return NULL; + } + + if (ret != FLB_TRUE) { + flb_condition_destroy(compiled); + return NULL; + } + } + + return compiled; +} + +static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition) +{ + if (!condition) { + return NULL; + } + + if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_FAILURE) { + return NULL; + } + + if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_SUCCESS && + condition->compiled) { + return condition->compiled; + } + + condition->compiled = route_condition_compile(condition); + if (!condition->compiled) { + condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE; + return NULL; + } + + condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS; + return condition->compiled; +} + +static void route_condition_record_destroy(struct flb_mp_chunk_record *record) +{ + if (!record) { + return; + } + + if (record->cobj_record) { + cfl_object_destroy(record->cobj_record); + record->cobj_record = NULL; + } + + if (record->cobj_metadata) { + cfl_object_destroy(record->cobj_metadata); + record->cobj_metadata = NULL; + } +} + From 3e147e14ab2197a13a7cea2972e0323cacb4dc88 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 12:30:09 -0600 Subject: [PATCH 04/30] task: add handling for direct route Signed-off-by: Eduardo Silva --- src/flb_task.c | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/flb_task.c b/src/flb_task.c index 75263ff70d4..c8e2d404299 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -360,6 +360,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, { int count = 0; int total_events = 0; + int direct_count = 0; struct flb_task *task; struct flb_event_chunk *evc; struct flb_task_route *route; @@ -415,11 +416,18 @@ struct flb_task *flb_task_create(uint64_t ref_id, /* Direct connects betweek input <> outputs (API based) */ if (mk_list_size(&i_ins->routes_direct) > 0) { + direct_count = 0; + mk_list_foreach(i_head, &i_ins->routes_direct) { route_path = mk_list_entry(i_head, struct flb_router_path, _head); + + if (flb_router_path_should_route(task->event_chunk, route_path) == FLB_FALSE) { + continue; + } + o_ins = route_path->ins; - route = flb_malloc(sizeof(struct flb_task_route)); + route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno(); task->event_chunk->data = NULL; @@ -427,10 +435,22 @@ struct flb_task *flb_task_create(uint64_t ref_id, return NULL; } + route->status = FLB_TASK_ROUTE_INACTIVE; route->out = o_ins; mk_list_add(&route->_head, &task->routes); + direct_count++; + } + + if (direct_count == 0) { + flb_debug("[task] dropping direct task=%p id=%i without matching routes", + task, task->id); + task->event_chunk->data = NULL; + flb_task_destroy(task, FLB_TRUE); + return NULL; } - flb_debug("[task] created direct task=%p id=%i OK", task, task->id); + + flb_debug("[task] created direct task=%p id=%i with %i route(s)", + task, task->id, direct_count); return task; } @@ -444,7 +464,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, continue; } - if (flb_routes_mask_get_bit(task_ic->routes_mask, + if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id, o_ins->config) != 0) { route = flb_calloc(1, sizeof(struct flb_task_route)); From a4ace4e67607a0a0924db7135e0c0ba30cd16421 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 12:30:39 -0600 Subject: [PATCH 05/30] tests: internal: router_config: extend test for conditional logs Signed-off-by: Eduardo Silva --- tests/internal/router_config.c | 355 +++++++++++++++++++++++++++++++++ 1 file changed, 355 insertions(+) diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c index c479c978021..8771e2e58b6 100644 --- a/tests/internal/router_config.c +++ b/tests/internal/router_config.c @@ -6,7 +6,9 @@ #include #include #include +#include #include +#include #include #include @@ -27,6 +29,105 @@ static struct cfl_variant *clone_variant(struct cfl_variant *var); +static int build_log_chunk(const char *level, + struct flb_log_event_encoder *encoder, + struct flb_event_chunk *chunk) +{ + int ret; + + if (!level || !encoder || !chunk) { + return -1; + } + + ret = flb_log_event_encoder_init(encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_begin_record(encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + ret = flb_log_event_encoder_set_current_timestamp(encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + ret = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_STRING_VALUE("level", 5), + FLB_LOG_EVENT_CSTRING_VALUE(level)); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + ret = flb_log_event_encoder_commit_record(encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + memset(chunk, 0, sizeof(*chunk)); + chunk->type = FLB_EVENT_TYPE_LOGS; + chunk->data = encoder->output_buffer; + chunk->size = encoder->output_length; + chunk->total_events = 1; + + return 0; +} + +static void free_route_condition(struct flb_route_condition *condition) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct flb_route_condition_rule *rule; + size_t idx; + + if (!condition) { + return; + } + + if (condition->compiled) { + flb_condition_destroy(condition->compiled); + } + + cfl_list_foreach_safe(head, tmp, &condition->rules) { + rule = cfl_list_entry(head, struct flb_route_condition_rule, _head); + cfl_list_del(&rule->_head); + + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + if (rule->values) { + for (idx = 0; idx < rule->values_count; idx++) { + if (rule->values[idx]) { + flb_sds_destroy(rule->values[idx]); + } + } + flb_free(rule->values); + } + + flb_free(rule); + } + + flb_free(condition); +} + static struct cfl_array *clone_array(struct cfl_array *array) { struct cfl_array *copy; @@ -850,6 +951,257 @@ void test_router_route_default_precedence() flb_cf_destroy(cf); } +static void test_router_condition_eval_logs_match() +{ + struct flb_route route; + struct flb_route_condition *condition; + struct flb_route_condition_rule *rule; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + int ret; + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route.outputs); + cfl_list_init(&route.processors); + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + TEST_CHECK(condition != NULL); + if (!condition) { + return; + } + + cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled_status = 0; + condition->compiled = NULL; + condition->is_default = FLB_FALSE; + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + TEST_CHECK(rule != NULL); + if (!rule) { + free_route_condition(condition); + return; + } + + cfl_list_init(&rule->_head); + rule->field = flb_sds_create("$level"); + rule->op = flb_sds_create("eq"); + rule->value = flb_sds_create("error"); + TEST_CHECK(rule->field != NULL && rule->op != NULL && rule->value != NULL); + if (!rule->field || !rule->op || !rule->value) { + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + flb_free(rule); + free_route_condition(condition); + return; + } + + cfl_list_add(&rule->_head, &condition->rules); + + route.condition = condition; + route.signals = FLB_ROUTER_SIGNAL_LOGS; + + ret = build_log_chunk("error", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_TRUE); + } + flb_log_event_encoder_destroy(&encoder); + + ret = build_log_chunk("info", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_FALSE); + } + flb_log_event_encoder_destroy(&encoder); + + free_route_condition(condition); +} + +static void test_router_condition_eval_logs_in_operator() +{ + struct flb_route route; + struct flb_route_condition *condition; + struct flb_route_condition_rule *rule; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + int ret; + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route.outputs); + cfl_list_init(&route.processors); + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + TEST_CHECK(condition != NULL); + if (!condition) { + return; + } + + cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled_status = 0; + condition->compiled = NULL; + condition->is_default = FLB_FALSE; + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + TEST_CHECK(rule != NULL); + if (!rule) { + free_route_condition(condition); + return; + } + + cfl_list_init(&rule->_head); + rule->field = flb_sds_create("$level"); + rule->op = flb_sds_create("in"); + TEST_CHECK(rule->field != NULL && rule->op != NULL); + if (!rule->field || !rule->op) { + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + flb_free(rule); + free_route_condition(condition); + return; + } + + rule->values_count = 2; + rule->values = flb_calloc(rule->values_count, sizeof(flb_sds_t)); + TEST_CHECK(rule->values != NULL); + if (!rule->values) { + free_route_condition(condition); + flb_sds_destroy(rule->field); + flb_sds_destroy(rule->op); + flb_free(rule); + return; + } + + rule->values[0] = flb_sds_create("error"); + rule->values[1] = flb_sds_create("fatal"); + TEST_CHECK(rule->values[0] != NULL && rule->values[1] != NULL); + if (!rule->values[0] || !rule->values[1]) { + if (rule->values[0]) { + flb_sds_destroy(rule->values[0]); + } + if (rule->values[1]) { + flb_sds_destroy(rule->values[1]); + } + flb_free(rule->values); + flb_sds_destroy(rule->field); + flb_sds_destroy(rule->op); + flb_free(rule); + free_route_condition(condition); + return; + } + + cfl_list_add(&rule->_head, &condition->rules); + + route.condition = condition; + route.signals = FLB_ROUTER_SIGNAL_LOGS; + + ret = build_log_chunk("fatal", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_TRUE); + } + flb_log_event_encoder_destroy(&encoder); + + ret = build_log_chunk("debug", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_FALSE); + } + flb_log_event_encoder_destroy(&encoder); + + free_route_condition(condition); +} + +static void test_router_path_should_route_condition() +{ + struct flb_router_path path; + struct flb_route route; + struct flb_route_condition *condition; + struct flb_route_condition_rule *rule; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + int ret; + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route.outputs); + cfl_list_init(&route.processors); + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + TEST_CHECK(condition != NULL); + if (!condition) { + return; + } + + cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled_status = 0; + condition->compiled = NULL; + condition->is_default = FLB_FALSE; + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + TEST_CHECK(rule != NULL); + if (!rule) { + free_route_condition(condition); + return; + } + + cfl_list_init(&rule->_head); + rule->field = flb_sds_create("$level"); + rule->op = flb_sds_create("eq"); + rule->value = flb_sds_create("error"); + TEST_CHECK(rule->field != NULL && rule->op != NULL && rule->value != NULL); + if (!rule->field || !rule->op || !rule->value) { + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + flb_free(rule); + free_route_condition(condition); + return; + } + + cfl_list_add(&rule->_head, &condition->rules); + + route.condition = condition; + route.signals = FLB_ROUTER_SIGNAL_LOGS; + + memset(&path, 0, sizeof(path)); + path.route = &route; + + ret = build_log_chunk("error", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_router_path_should_route(&chunk, &path) == FLB_TRUE); + } + flb_log_event_encoder_destroy(&encoder); + + ret = build_log_chunk("info", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_router_path_should_route(&chunk, &path) == FLB_FALSE); + } + flb_log_event_encoder_destroy(&encoder); + + free_route_condition(condition); +} + TEST_LIST = { { "parse_basic", test_router_config_parse_basic }, { "duplicate_route", test_router_config_duplicate_route }, @@ -859,5 +1211,8 @@ TEST_LIST = { { "apply_config_success", test_router_apply_config_success }, { "apply_config_missing_output", test_router_apply_config_missing_output }, { "route_default_precedence", test_router_route_default_precedence }, + { "condition_eval_logs_match", test_router_condition_eval_logs_match }, + { "condition_eval_logs_in_operator", test_router_condition_eval_logs_in_operator }, + { "path_should_route_condition", test_router_path_should_route_condition }, { 0 } }; From 947c87442dfb140ef64b69023fc637e9c2396fc7 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 16:56:05 -0600 Subject: [PATCH 06/30] wip Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 21 ++++ src/flb_event.c | 1 - src/flb_router_condition.c | 195 ++++++++++++++++++++++---------- src/flb_task.c | 35 +++++- tests/internal/router_config.c | 32 ++++-- 5 files changed, 215 insertions(+), 69 deletions(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index e7c0ba92610..4f687a5c357 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -30,6 +30,16 @@ #include #include +struct flb_mp_chunk_cobj; +struct flb_log_event_encoder; +struct flb_log_event_decoder; + +struct flb_router_chunk_context { + struct flb_mp_chunk_cobj *chunk_cobj; + struct flb_log_event_encoder *log_encoder; + struct flb_log_event_decoder *log_decoder; +}; + struct flb_router_path { struct flb_output_instance *ins; struct flb_route *route; @@ -135,15 +145,26 @@ void flb_router_exit(struct flb_config *config); uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk); +int flb_router_chunk_context_init(struct flb_router_chunk_context *context); +void flb_router_chunk_context_reset(struct flb_router_chunk_context *context); +void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context); +int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context, + struct flb_event_chunk *chunk); + int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_router_path_should_route(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_router_path *path); struct flb_cf; diff --git a/src/flb_event.c b/src/flb_event.c index 2edf3a29643..906344e05ec 100644 --- a/src/flb_event.c +++ b/src/flb_event.c @@ -23,7 +23,6 @@ #include #include - struct flb_event_chunk *flb_event_chunk_create(int type, int total_events, char *tag_buf, int tag_len, diff --git a/src/flb_router_condition.c b/src/flb_router_condition.c index fa1ad515954..cb24b89b9d7 100644 --- a/src/flb_router_condition.c +++ b/src/flb_router_condition.c @@ -21,15 +21,117 @@ #include #include #include +#include #include -#include #include #define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1 #define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1 static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition); -static void route_condition_record_destroy(struct flb_mp_chunk_record *record); + +int flb_router_chunk_context_init(struct flb_router_chunk_context *context) +{ + if (!context) { + return -1; + } + + context->chunk_cobj = NULL; + context->log_encoder = NULL; + context->log_decoder = NULL; + + return 0; +} + +void flb_router_chunk_context_reset(struct flb_router_chunk_context *context) +{ + if (!context) { + return; + } + + if (context->chunk_cobj) { + flb_mp_chunk_cobj_destroy(context->chunk_cobj); + context->chunk_cobj = NULL; + } + + if (context->log_decoder) { + flb_log_event_decoder_destroy(context->log_decoder); + context->log_decoder = NULL; + } + + if (context->log_encoder) { + flb_log_event_encoder_destroy(context->log_encoder); + context->log_encoder = NULL; + } +} + +void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context) +{ + flb_router_chunk_context_reset(context); +} + +int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context, + struct flb_event_chunk *chunk) +{ + int ret; + struct flb_mp_chunk_record *record; + + if (!context || !chunk) { + return -1; + } + + if (chunk->type != FLB_EVENT_TYPE_LOGS) { + return 0; + } + + if (context->chunk_cobj) { + return 0; + } + + if (!chunk->data || chunk->size == 0) { + return -1; + } + + if (!context->log_encoder) { + context->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!context->log_encoder) { + return -1; + } + } + + if (!context->log_decoder) { + context->log_decoder = flb_log_event_decoder_create(NULL, 0); + if (!context->log_decoder) { + flb_router_chunk_context_reset(context); + return -1; + } + flb_log_event_decoder_read_groups(context->log_decoder, FLB_TRUE); + } + + flb_log_event_decoder_reset(context->log_decoder, chunk->data, chunk->size); + + context->chunk_cobj = flb_mp_chunk_cobj_create(context->log_encoder, + context->log_decoder); + if (!context->chunk_cobj) { + flb_router_chunk_context_reset(context); + return -1; + } + + while ((ret = flb_mp_chunk_cobj_record_next(context->chunk_cobj, &record)) == + FLB_MP_CHUNK_RECORD_OK) { + continue; + } + + if (ret != FLB_MP_CHUNK_RECORD_EOF) { + flb_router_chunk_context_reset(context); + return -1; + } + + context->chunk_cobj->record_pos = NULL; + context->chunk_cobj->condition = NULL; + + return 0; +} uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) { @@ -52,21 +154,16 @@ uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) } int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { - int ret; int result = FLB_FALSE; struct flb_route_condition *condition; struct flb_condition *compiled; - struct flb_log_event_decoder decoder; - struct flb_log_event event; - struct flb_mp_chunk_record record; - - if (!chunk || !route || !route->condition) { - return FLB_FALSE; - } + struct flb_mp_chunk_record *record; + struct cfl_list *head; - if (!chunk->data || chunk->size == 0) { + if (!chunk || !context || !route || !route->condition) { return FLB_FALSE; } @@ -77,66 +174,50 @@ int flb_condition_eval_logs(struct flb_event_chunk *chunk, return FLB_FALSE; } - ret = flb_log_event_decoder_init(&decoder, chunk->data, chunk->size); - if (ret != FLB_EVENT_DECODER_SUCCESS) { + if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) { return FLB_FALSE; } - flb_log_event_decoder_read_groups(&decoder, FLB_TRUE); - - while ((ret = flb_log_event_decoder_next(&decoder, &event)) == FLB_EVENT_DECODER_SUCCESS) { - memset(&record, 0, sizeof(record)); - record.event = event; - - if (event.metadata) { - record.cobj_metadata = flb_mp_object_to_cfl(event.metadata); - if (!record.cobj_metadata) { - route_condition_record_destroy(&record); - break; - } - } + if (!context->chunk_cobj) { + return FLB_FALSE; + } - if (event.body) { - record.cobj_record = flb_mp_object_to_cfl(event.body); - if (!record.cobj_record) { - route_condition_record_destroy(&record); - break; - } - } + cfl_list_foreach(head, &context->chunk_cobj->records) { + record = cfl_list_entry(head, struct flb_mp_chunk_record, _head); - if (flb_condition_evaluate(compiled, &record) == FLB_TRUE) { + if (flb_condition_evaluate(compiled, record) == FLB_TRUE) { result = FLB_TRUE; - route_condition_record_destroy(&record); break; } - - route_condition_record_destroy(&record); } - flb_log_event_decoder_destroy(&decoder); - return result; } int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { (void) chunk; + (void) context; (void) route; return FLB_FALSE; } int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { (void) chunk; + (void) context; (void) route; return FLB_FALSE; } int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { uint32_t signal; @@ -164,11 +245,11 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, switch (signal) { case FLB_ROUTER_SIGNAL_LOGS: - return flb_condition_eval_logs(chunk, route); + return flb_condition_eval_logs(chunk, context, route); case FLB_ROUTER_SIGNAL_METRICS: - return flb_condition_eval_metrics(chunk, route); + return flb_condition_eval_metrics(chunk, context, route); case FLB_ROUTER_SIGNAL_TRACES: - return flb_condition_eval_traces(chunk, route); + return flb_condition_eval_traces(chunk, context, route); default: break; } @@ -177,17 +258,28 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, } int flb_router_path_should_route(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_router_path *path) { if (!path) { return FLB_FALSE; } + if (chunk && chunk->type == FLB_EVENT_TYPE_LOGS) { + if (!context) { + return FLB_FALSE; + } + + if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) { + return FLB_FALSE; + } + } + if (!path->route) { return FLB_TRUE; } - return flb_route_condition_eval(chunk, path->route); + return flb_route_condition_eval(chunk, context, path->route); } static int parse_rule_operator(const flb_sds_t op_str, @@ -357,20 +449,3 @@ static struct flb_condition *route_condition_get_compiled(struct flb_route_condi return condition->compiled; } -static void route_condition_record_destroy(struct flb_mp_chunk_record *record) -{ - if (!record) { - return; - } - - if (record->cobj_record) { - cfl_object_destroy(record->cobj_record); - record->cobj_record = NULL; - } - - if (record->cobj_metadata) { - cfl_object_destroy(record->cobj_metadata); - record->cobj_metadata = NULL; - } -} - diff --git a/src/flb_task.c b/src/flb_task.c index c8e2d404299..c77267bcd32 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -369,6 +369,8 @@ struct flb_task *flb_task_create(uint64_t ref_id, struct flb_input_chunk *task_ic; struct mk_list *i_head; struct mk_list *o_head; + struct flb_router_chunk_context router_context; + int router_context_initialized = FLB_FALSE; /* No error status */ *err = FLB_FALSE; @@ -393,6 +395,15 @@ struct flb_task *flb_task_create(uint64_t ref_id, return NULL; } + if (flb_router_chunk_context_init(&router_context) != 0) { + flb_error("[task] failed to initialize router chunk context"); + flb_event_chunk_destroy(evc); + flb_free(task); + *err = FLB_TRUE; + return NULL; + } + router_context_initialized = FLB_TRUE; + #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_debug("add trace to task"); @@ -421,7 +432,9 @@ struct flb_task *flb_task_create(uint64_t ref_id, mk_list_foreach(i_head, &i_ins->routes_direct) { route_path = mk_list_entry(i_head, struct flb_router_path, _head); - if (flb_router_path_should_route(task->event_chunk, route_path) == FLB_FALSE) { + if (flb_router_path_should_route(task->event_chunk, + &router_context, + route_path) == FLB_FALSE) { continue; } @@ -430,6 +443,10 @@ struct flb_task *flb_task_create(uint64_t ref_id, route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno(); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } task->event_chunk->data = NULL; flb_task_destroy(task, FLB_TRUE); return NULL; @@ -444,6 +461,10 @@ struct flb_task *flb_task_create(uint64_t ref_id, if (direct_count == 0) { flb_debug("[task] dropping direct task=%p id=%i without matching routes", task, task->id); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } task->event_chunk->data = NULL; flb_task_destroy(task, FLB_TRUE); return NULL; @@ -451,6 +472,10 @@ struct flb_task *flb_task_create(uint64_t ref_id, flb_debug("[task] created direct task=%p id=%i with %i route(s)", task, task->id, direct_count); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } return task; } @@ -484,11 +509,19 @@ struct flb_task *flb_task_create(uint64_t ref_id, if (count == 0) { flb_debug("[task] created task=%p id=%i without routes, dropping.", task, task->id); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } task->event_chunk->data = NULL; flb_task_destroy(task, FLB_TRUE); return NULL; } + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } flb_debug("[task] created task=%p id=%i OK", task, task->id); return task; } diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c index 8771e2e58b6..42c19ec807e 100644 --- a/tests/internal/router_config.c +++ b/tests/internal/router_config.c @@ -939,7 +939,7 @@ void test_router_route_default_precedence() memset(&chunk, 0, sizeof(chunk)); chunk.type = FLB_EVENT_TYPE_LOGS; - TEST_CHECK(flb_route_condition_eval(&chunk, route) == FLB_TRUE); + TEST_CHECK(flb_route_condition_eval(&chunk, NULL, route) == FLB_TRUE); output = cfl_list_entry(route->outputs.next, struct flb_route_output, _head); TEST_CHECK(strcmp(output->name, "lib_route") == 0); @@ -958,6 +958,7 @@ static void test_router_condition_eval_logs_match() struct flb_route_condition_rule *rule; struct flb_log_event_encoder encoder; struct flb_event_chunk chunk; + struct flb_router_chunk_context context; int ret; memset(&route, 0, sizeof(route)); @@ -1008,20 +1009,25 @@ static void test_router_condition_eval_logs_match() route.condition = condition; route.signals = FLB_ROUTER_SIGNAL_LOGS; + flb_router_chunk_context_init(&context); + ret = build_log_chunk("error", &encoder, &chunk); TEST_CHECK(ret == 0); if (ret == 0) { - TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_TRUE); + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_TRUE); } + flb_router_chunk_context_reset(&context); flb_log_event_encoder_destroy(&encoder); ret = build_log_chunk("info", &encoder, &chunk); TEST_CHECK(ret == 0); if (ret == 0) { - TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_FALSE); + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_FALSE); } + flb_router_chunk_context_reset(&context); flb_log_event_encoder_destroy(&encoder); + flb_router_chunk_context_destroy(&context); free_route_condition(condition); } @@ -1032,6 +1038,7 @@ static void test_router_condition_eval_logs_in_operator() struct flb_route_condition_rule *rule; struct flb_log_event_encoder encoder; struct flb_event_chunk chunk; + struct flb_router_chunk_context context; int ret; memset(&route, 0, sizeof(route)); @@ -1107,20 +1114,25 @@ static void test_router_condition_eval_logs_in_operator() route.condition = condition; route.signals = FLB_ROUTER_SIGNAL_LOGS; + flb_router_chunk_context_init(&context); + ret = build_log_chunk("fatal", &encoder, &chunk); TEST_CHECK(ret == 0); if (ret == 0) { - TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_TRUE); + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_TRUE); } + flb_router_chunk_context_reset(&context); flb_log_event_encoder_destroy(&encoder); ret = build_log_chunk("debug", &encoder, &chunk); TEST_CHECK(ret == 0); if (ret == 0) { - TEST_CHECK(flb_condition_eval_logs(&chunk, &route) == FLB_FALSE); + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_FALSE); } + flb_router_chunk_context_reset(&context); flb_log_event_encoder_destroy(&encoder); + flb_router_chunk_context_destroy(&context); free_route_condition(condition); } @@ -1132,6 +1144,7 @@ static void test_router_path_should_route_condition() struct flb_route_condition_rule *rule; struct flb_log_event_encoder encoder; struct flb_event_chunk chunk; + struct flb_router_chunk_context context; int ret; memset(&route, 0, sizeof(route)); @@ -1185,20 +1198,25 @@ static void test_router_path_should_route_condition() memset(&path, 0, sizeof(path)); path.route = &route; + flb_router_chunk_context_init(&context); + ret = build_log_chunk("error", &encoder, &chunk); TEST_CHECK(ret == 0); if (ret == 0) { - TEST_CHECK(flb_router_path_should_route(&chunk, &path) == FLB_TRUE); + TEST_CHECK(flb_router_path_should_route(&chunk, &context, &path) == FLB_TRUE); } + flb_router_chunk_context_reset(&context); flb_log_event_encoder_destroy(&encoder); ret = build_log_chunk("info", &encoder, &chunk); TEST_CHECK(ret == 0); if (ret == 0) { - TEST_CHECK(flb_router_path_should_route(&chunk, &path) == FLB_FALSE); + TEST_CHECK(flb_router_path_should_route(&chunk, &context, &path) == FLB_FALSE); } + flb_router_chunk_context_reset(&context); flb_log_event_encoder_destroy(&encoder); + flb_router_chunk_context_destroy(&context); free_route_condition(condition); } From 840e2ba874ae63f818b3cd06d8a98e2d07b84fa4 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 20 Oct 2025 22:05:37 -0600 Subject: [PATCH 07/30] wip Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 2 + src/flb_input_log.c | 629 +++++++++++++++++++++++++++++++- src/flb_router_condition.c | 9 + 3 files changed, 639 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 4f687a5c357..06b6418c1d3 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -167,6 +167,8 @@ int flb_router_path_should_route(struct flb_event_chunk *chunk, struct flb_router_chunk_context *context, struct flb_router_path *path); +struct flb_condition *flb_router_route_get_condition(struct flb_route *route); + struct flb_cf; int flb_router_config_parse(struct flb_cf *cf, diff --git a/src/flb_input_log.c b/src/flb_input_log.c index ac4b5cdfa67..75dd5aae433 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -23,6 +23,618 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +struct flb_route_payload { + struct flb_route *route; + int is_default; + flb_sds_t tag; + char *data; + size_t size; + size_t total_records; + struct mk_list _head; +}; + +static void route_payload_destroy(struct flb_route_payload *payload) +{ + if (!payload) { + return; + } + + if (!mk_list_entry_is_orphan(&payload->_head)) { + mk_list_del(&payload->_head); + } + + if (payload->tag) { + flb_sds_destroy(payload->tag); + } + + if (payload->data) { + flb_free(payload->data); + } + + flb_free(payload); +} + +static struct flb_route_payload *route_payload_find(struct mk_list *payloads, + struct flb_route *route) +{ + struct mk_list *head; + struct flb_route_payload *payload; + + if (!payloads || !route) { + return NULL; + } + + mk_list_foreach(head, payloads) { + payload = mk_list_entry(head, struct flb_route_payload, _head); + + if (payload->route == route) { + return payload; + } + } + + return NULL; +} + +static int append_output_to_payload_tag(struct flb_route_payload *payload, + struct flb_output_instance *out) +{ + const char *identifier; + + if (!payload || !out) { + return -1; + } + + if (out->alias) { + identifier = out->alias; + } + else { + identifier = flb_output_name(out); + } + + if (!identifier) { + return -1; + } + + if (!payload->tag) { + payload->tag = flb_sds_create(identifier); + if (!payload->tag) { + return -1; + } + } + else { + payload->tag = flb_sds_cat(payload->tag, ",", 1); + if (!payload->tag) { + return -1; + } + + payload->tag = flb_sds_cat(payload->tag, identifier, strlen(identifier)); + if (!payload->tag) { + return -1; + } + } + + return 0; +} + +static int encode_empty_map(char **out_buf, size_t *out_size) +{ + char *buf; + + if (!out_buf || !out_size) { + return -1; + } + + buf = flb_malloc(1); + if (!buf) { + flb_errno(); + return -1; + } + + buf[0] = 0x80; + + *out_buf = buf; + *out_size = 1; + + return 0; +} + +static int encode_cfl_object_or_empty(struct cfl_object *obj, + char **out_buf, + size_t *out_size) +{ + if (!out_buf || !out_size) { + return -1; + } + + if (obj) { + return flb_mp_cfl_to_msgpack(obj, out_buf, out_size); + } + + return encode_empty_map(out_buf, out_size); +} + +static int encode_chunk_record(struct flb_log_event_encoder *encoder, + struct flb_mp_chunk_record *record) +{ + int ret; + int record_type; + char *mp_buf = NULL; + size_t mp_size = 0; + + if (!encoder || !record) { + return -1; + } + + ret = flb_log_event_encoder_begin_record(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_set_timestamp(encoder, &record->event.timestamp); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + if (record->event.timestamp.tm.tv_sec >= 0) { + record_type = FLB_LOG_EVENT_NORMAL; + } + else if (record->event.timestamp.tm.tv_sec == FLB_LOG_EVENT_GROUP_START) { + record_type = FLB_LOG_EVENT_GROUP_START; + } + else if (record->event.timestamp.tm.tv_sec == FLB_LOG_EVENT_GROUP_END) { + record_type = FLB_LOG_EVENT_GROUP_END; + } + else { + record_type = FLB_LOG_EVENT_NORMAL; + } + + ret = encode_cfl_object_or_empty(record->cobj_metadata, &mp_buf, &mp_size); + if (ret != 0) { + return -1; + } + + ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(encoder, + mp_buf, + mp_size); + flb_free(mp_buf); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + mp_buf = NULL; + mp_size = 0; + + if (record_type == FLB_LOG_EVENT_GROUP_START && + record->cobj_group_attributes) { + ret = flb_mp_cfl_to_msgpack(record->cobj_group_attributes, + &mp_buf, + &mp_size); + } + else if (record->cobj_record) { + ret = flb_mp_cfl_to_msgpack(record->cobj_record, &mp_buf, &mp_size); + } + else { + ret = encode_empty_map(&mp_buf, &mp_size); + } + + if (ret != 0) { + return -1; + } + + ret = flb_log_event_encoder_set_body_from_raw_msgpack(encoder, + mp_buf, + mp_size); + flb_free(mp_buf); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_commit_record(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + return 0; +} + +static int build_payload_for_route(struct flb_route_payload *payload, + struct flb_mp_chunk_record **records, + size_t record_count, + uint8_t *matched_non_default) +{ + size_t i; + int matched; + int ret; + struct flb_condition *compiled; + struct flb_log_event_encoder *encoder; + + if (!payload || !records || record_count == 0 || !matched_non_default) { + return -1; + } + + compiled = flb_router_route_get_condition(payload->route); + if (!compiled) { + return 0; + } + + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!encoder) { + return -1; + } + + matched = 0; + + for (i = 0; i < record_count; i++) { + if (flb_condition_evaluate(compiled, records[i]) != FLB_TRUE) { + continue; + } + + ret = encode_chunk_record(encoder, records[i]); + if (ret != 0) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + matched_non_default[i] = 1; + matched++; + } + + if (matched == 0) { + flb_log_event_encoder_destroy(encoder); + return 0; + } + + payload->data = flb_malloc(encoder->output_length); + if (!payload->data) { + flb_log_event_encoder_destroy(encoder); + flb_errno(); + return -1; + } + + memcpy(payload->data, encoder->output_buffer, encoder->output_length); + payload->size = encoder->output_length; + payload->total_records = matched; + + flb_log_event_encoder_destroy(encoder); + + return 0; +} + +static int build_payload_for_default_route(struct flb_route_payload *payload, + struct flb_mp_chunk_record **records, + size_t record_count, + uint8_t *matched_non_default) +{ + size_t i; + int matched; + int ret; + struct flb_condition *compiled; + struct flb_log_event_encoder *encoder; + + if (!payload || !records || !matched_non_default) { + return -1; + } + + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!encoder) { + return -1; + } + + compiled = flb_router_route_get_condition(payload->route); + matched = 0; + + for (i = 0; i < record_count; i++) { + if (matched_non_default[i]) { + continue; + } + + if (compiled && + flb_condition_evaluate(compiled, records[i]) != FLB_TRUE) { + continue; + } + + ret = encode_chunk_record(encoder, records[i]); + if (ret != 0) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + matched++; + } + + if (matched == 0) { + flb_log_event_encoder_destroy(encoder); + return 0; + } + + payload->data = flb_malloc(encoder->output_length); + if (!payload->data) { + flb_log_event_encoder_destroy(encoder); + flb_errno(); + return -1; + } + + memcpy(payload->data, encoder->output_buffer, encoder->output_length); + payload->size = encoder->output_length; + payload->total_records = matched; + + flb_log_event_encoder_destroy(encoder); + + return 0; +} + +static void route_payload_list_destroy(struct mk_list *payloads) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_route_payload *payload; + + if (!payloads) { + return; + } + + mk_list_foreach_safe(head, tmp, payloads) { + payload = mk_list_entry(head, struct flb_route_payload, _head); + route_payload_destroy(payload); + } +} + +static int input_has_conditional_routes(struct flb_input_instance *ins) +{ + struct mk_list *head; + struct flb_router_path *route_path; + + if (!ins) { + return FLB_FALSE; + } + + mk_list_foreach(head, &ins->routes_direct) { + route_path = mk_list_entry(head, struct flb_router_path, _head); + + if (route_path->route && route_path->route->condition) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int split_and_append_route_payloads(struct flb_input_instance *ins, + size_t records, + const char *tag, + size_t tag_len, + const void *buf, + size_t buf_size) +{ + int ret; + int appended; + int handled; + int context_initialized = FLB_FALSE; + struct mk_list payloads; + struct mk_list *head; + struct mk_list *tmp; + struct flb_router_path *route_path; + struct flb_route_payload *payload; + struct flb_router_chunk_context context; + struct flb_event_chunk *chunk; + struct flb_mp_chunk_record **records_array = NULL; + uint8_t *matched_non_default = NULL; + size_t record_count; + size_t index; + const char *base_tag = tag; + size_t base_tag_len = tag_len; + + handled = FLB_FALSE; + + if (!ins || !buf || buf_size == 0) { + return 0; + } + + if (mk_list_is_empty(&ins->routes_direct) || + input_has_conditional_routes(ins) == FLB_FALSE) { + return 0; + } + + mk_list_init(&payloads); + + mk_list_foreach(head, &ins->routes_direct) { + route_path = mk_list_entry(head, struct flb_router_path, _head); + + if (!route_path->route || !route_path->route->condition) { + continue; + } + + payload = route_payload_find(&payloads, route_path->route); + if (!payload) { + payload = flb_calloc(1, sizeof(struct flb_route_payload)); + if (!payload) { + flb_errno(); + route_payload_list_destroy(&payloads); + return -1; + } + + payload->route = route_path->route; + payload->is_default = route_path->route->condition->is_default; + mk_list_add(&payload->_head, &payloads); + } + + if (append_output_to_payload_tag(payload, route_path->ins) != 0) { + route_payload_list_destroy(&payloads); + return -1; + } + } + + if (mk_list_is_empty(&payloads)) { + return 0; + } + + handled = FLB_TRUE; + + if (!base_tag) { + if (ins->tag && ins->tag_len > 0) { + base_tag = ins->tag; + base_tag_len = ins->tag_len; + } + else { + base_tag = ins->name; + base_tag_len = strlen(ins->name); + } + } + + chunk = flb_event_chunk_create(FLB_EVENT_TYPE_LOGS, + records, + (char *) base_tag, + base_tag_len, + (char *) buf, + buf_size); + if (!chunk) { + route_payload_list_destroy(&payloads); + return -1; + } + + if (flb_router_chunk_context_init(&context) != 0) { + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + context_initialized = FLB_TRUE; + + ret = flb_router_chunk_context_prepare_logs(&context, chunk); + if (ret != 0 || !context.chunk_cobj) { + if (context_initialized) { + flb_router_chunk_context_destroy(&context); + } + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + record_count = cfl_list_size(&context.chunk_cobj->records); + if (record_count == 0) { + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return handled ? 1 : 0; + } + + records_array = flb_calloc(record_count, + sizeof(struct flb_mp_chunk_record *)); + matched_non_default = flb_calloc(record_count, sizeof(uint8_t)); + if (!records_array || !matched_non_default) { + flb_errno(); + if (records_array) { + flb_free(records_array); + } + if (matched_non_default) { + flb_free(matched_non_default); + } + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + index = 0; + cfl_list_foreach(head, &context.chunk_cobj->records) { + records_array[index++] = + cfl_list_entry(head, struct flb_mp_chunk_record, _head); + } + + mk_list_foreach(head, &payloads) { + payload = mk_list_entry(head, struct flb_route_payload, _head); + + if (payload->is_default) { + continue; + } + + ret = build_payload_for_route(payload, + records_array, + record_count, + matched_non_default); + if (ret != 0) { + flb_free(records_array); + flb_free(matched_non_default); + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + } + + mk_list_foreach(head, &payloads) { + payload = mk_list_entry(head, struct flb_route_payload, _head); + + if (!payload->is_default) { + continue; + } + + ret = build_payload_for_default_route(payload, + records_array, + record_count, + matched_non_default); + if (ret != 0) { + flb_free(records_array); + flb_free(matched_non_default); + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + } + + flb_free(records_array); + flb_free(matched_non_default); + + mk_list_foreach_safe(head, tmp, &payloads) { + payload = mk_list_entry(head, struct flb_route_payload, _head); + + if (payload->total_records <= 0 || !payload->data) { + route_payload_destroy(payload); + } + } + + appended = 0; + mk_list_foreach(head, &payloads) { + payload = mk_list_entry(head, struct flb_route_payload, _head); + + ret = flb_input_chunk_append_raw(ins, + FLB_INPUT_LOGS, + payload->total_records, + payload->tag, + flb_sds_len(payload->tag), + payload->data, + payload->size); + if (ret != 0) { + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + appended++; + } + + if (context_initialized) { + flb_router_chunk_context_destroy(&context); + } + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + + return handled ? (appended > 0 ? appended : 1) : 0; +} static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, @@ -68,10 +680,25 @@ static int input_log_append(struct flb_input_instance *ins, } } + ret = split_and_append_route_payloads(ins, records, tag, tag_len, + out_buf, out_size); + if (ret < 0) { + if (processor_is_active && buf != out_buf) { + flb_free(out_buf); + } + return -1; + } + + if (ret > 0) { + if (processor_is_active && buf != out_buf) { + flb_free(out_buf); + } + return 0; + } + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, tag, tag_len, out_buf, out_size); - if (processor_is_active && buf != out_buf) { flb_free(out_buf); } diff --git a/src/flb_router_condition.c b/src/flb_router_condition.c index cb24b89b9d7..d24fcc67ba1 100644 --- a/src/flb_router_condition.c +++ b/src/flb_router_condition.c @@ -282,6 +282,15 @@ int flb_router_path_should_route(struct flb_event_chunk *chunk, return flb_route_condition_eval(chunk, context, path->route); } +struct flb_condition *flb_router_route_get_condition(struct flb_route *route) +{ + if (!route || !route->condition) { + return NULL; + } + + return route_condition_get_compiled(route->condition); +} + static int parse_rule_operator(const flb_sds_t op_str, enum flb_rule_operator *out) { From 10058f2e05b947505c9165f6d74d9f175899b8fe Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 21 Oct 2025 10:58:28 -0600 Subject: [PATCH 08/30] wip Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 1 + src/flb_input_log.c | 28 ++++++++++++++++++++++++---- src/flb_router_config.c | 11 +++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 06b6418c1d3..06df3868b1b 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -121,6 +121,7 @@ struct flb_route { flb_sds_t name; uint32_t signals; struct flb_route_condition *condition; + int per_record_routing; struct cfl_list outputs; struct cfl_list processors; struct cfl_list _head; diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 75dd5aae433..b33008b1fdc 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -17,6 +17,7 @@ * limitations under the License. */ +#include "fluent-bit/flb_pack.h" #include #include #include @@ -404,7 +405,8 @@ static int input_has_conditional_routes(struct flb_input_instance *ins) mk_list_foreach(head, &ins->routes_direct) { route_path = mk_list_entry(head, struct flb_router_path, _head); - if (route_path->route && route_path->route->condition) { + if (route_path->route && + (route_path->route->condition || route_path->route->per_record_routing)) { return FLB_TRUE; } } @@ -443,17 +445,25 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return 0; } - if (mk_list_is_empty(&ins->routes_direct) || - input_has_conditional_routes(ins) == FLB_FALSE) { + if (mk_list_size(&ins->routes_direct) == 0) { + flb_info("[router] no direct routes found"); return 0; } + if (input_has_conditional_routes(ins) == FLB_FALSE) { + flb_info("[router] no conditional routes found"); + return 0; + } + + flb_info("[router] conditional routing triggered for %zu routes", mk_list_size(&ins->routes_direct)); + mk_list_init(&payloads); mk_list_foreach(head, &ins->routes_direct) { route_path = mk_list_entry(head, struct flb_router_path, _head); - if (!route_path->route || !route_path->route->condition) { + if (!route_path->route || + (!route_path->route->condition && !route_path->route->per_record_routing)) { continue; } @@ -742,6 +752,16 @@ int flb_input_log_append_records(struct flb_input_instance *ins, const void *buf, size_t buf_size) { int ret; + char tag_copy[128]; + + strncpy(tag_copy, tag, sizeof(tag_copy)); + tag_copy[sizeof(tag_copy) - 1] = '\0'; + + printf("--------------------------------\n"); + printf("appending records: tag: %s, tag_len: %zu, records: %zu\n", tag_copy, tag_len, records); + flb_pack_print(buf, buf_size); + printf("--------------------------------\n"); + printf("\n"); ret = input_log_append(ins, 0, records, tag, tag_len, buf, buf_size); return ret; diff --git a/src/flb_router_config.c b/src/flb_router_config.c index 2a3a50771f9..b3ae699b4c0 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -867,6 +867,7 @@ static int parse_route(struct cfl_variant *variant, cfl_list_init(&route->outputs); cfl_list_init(&route->processors); route->signals = signals; + route->per_record_routing = FLB_FALSE; // Default to false route->name = copy_from_cfl_sds(name_var->data.as_string); if (!route->name) { @@ -891,6 +892,16 @@ static int parse_route(struct cfl_variant *variant, } } + // Parse per_record_routing option + struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing"); + if (per_record_var) { + int val; + if (variant_to_bool(per_record_var, &val) == 0) { + route->per_record_routing = val; + flb_info("[router] route '%s' per_record_routing=%d", route->name, val); + } + } + processors_var = cfl_kvlist_fetch(kvlist, "processors"); if (processors_var) { if (parse_processors(processors_var, &route->processors, config) != 0) { From eb798a75a2ced2ae3a0fa7ae4c89481fcbde71ae Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 21 Oct 2025 11:31:15 -0600 Subject: [PATCH 09/30] wip Signed-off-by: Eduardo Silva --- src/flb_conditionals.c | 31 +++++++-- src/flb_input_log.c | 121 +++++++++++++++++++++++++++++++++- tests/internal/conditionals.c | 18 ++++- 3 files changed, 163 insertions(+), 7 deletions(-) diff --git a/src/flb_conditionals.c b/src/flb_conditionals.c index 484931811ac..05f943be17e 100644 --- a/src/flb_conditionals.c +++ b/src/flb_conditionals.c @@ -363,6 +363,8 @@ int flb_condition_evaluate(struct flb_condition *cond, struct flb_condition_rule *rule; struct cfl_variant *record_variant; int result; + int any_rule_evaluated = FLB_FALSE; + int any_rule_matched = FLB_FALSE; if (!cond || !record) { flb_trace("[condition] NULL condition or record, returning TRUE"); @@ -382,9 +384,16 @@ int flb_condition_evaluate(struct flb_condition *cond, /* Get the variant for this rule's context */ record_variant = get_record_variant(record, rule->context); + any_rule_evaluated = FLB_TRUE; if (!record_variant) { flb_trace("[condition] no record variant found for context %d", rule->context); - continue; + if (cond->op == FLB_COND_OP_AND) { + flb_trace("[condition] AND condition missing field, returning FALSE"); + return FLB_FALSE; + } + else { + continue; + } } flb_trace("[condition] evaluating rule against record"); @@ -399,8 +408,22 @@ int flb_condition_evaluate(struct flb_condition *cond, flb_trace("[condition] OR condition with TRUE result, short-circuiting"); return FLB_TRUE; } + + if (result == FLB_TRUE) { + any_rule_matched = FLB_TRUE; + } + } + + if (cond->op == FLB_COND_OP_OR) { + flb_trace("[condition] final evaluation result: %d", any_rule_matched); + return any_rule_matched; } - flb_trace("[condition] final evaluation result: %d", (cond->op == FLB_COND_OP_AND) ? FLB_TRUE : FLB_FALSE); - return (cond->op == FLB_COND_OP_AND) ? FLB_TRUE : FLB_FALSE; -} \ No newline at end of file + if (any_rule_evaluated == FLB_FALSE) { + flb_trace("[condition] no rules evaluated, defaulting to FALSE for AND condition"); + return FLB_FALSE; + } + + flb_trace("[condition] final evaluation result: TRUE"); + return FLB_TRUE; +} diff --git a/src/flb_input_log.c b/src/flb_input_log.c index b33008b1fdc..03e7a4bc065 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -21,10 +21,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -130,6 +132,83 @@ static int append_output_to_payload_tag(struct flb_route_payload *payload, return 0; } +static int route_payload_apply_outputs(struct flb_input_instance *ins, + struct flb_route_payload *payload) +{ + int ret; + size_t out_size = 0; + struct flb_input_chunk *chunk = NULL; + struct mk_list *head; + struct flb_router_path *route_path; + int routes_found = 0; + + if (!ins || !payload || !payload->tag || !payload->route) { + flb_info("[router] route_payload_apply_outputs: invalid parameters"); + return -1; + } + + flb_info("[router] route_payload_apply_outputs: processing route '%s' with tag '%s'", + payload->route->name, payload->tag); + + if (!ins->ht_log_chunks || !ins->config) { + flb_info("[router] route_payload_apply_outputs: missing ht_log_chunks or config"); + return -1; + } + + ret = flb_hash_table_get(ins->ht_log_chunks, + payload->tag, + flb_sds_len(payload->tag), + (void **) &chunk, + &out_size); + if (ret == -1 || !chunk || !chunk->routes_mask) { + flb_info("[router] route_payload_apply_outputs: failed to get chunk or routes_mask"); + return -1; + } + + flb_info("[router] route_payload_apply_outputs: found chunk, clearing routes_mask"); + memset(chunk->routes_mask, 0, + sizeof(flb_route_mask_element) * ins->config->route_mask_size); + + flb_info("[router] route_payload_apply_outputs: scanning %zu direct routes", + mk_list_size(&ins->routes_direct)); + + mk_list_foreach(head, &ins->routes_direct) { + route_path = mk_list_entry(head, struct flb_router_path, _head); + + flb_info("[router] route_payload_apply_outputs: checking route_path->route=%p, payload->route=%p, route_path->ins=%p", + route_path->route, payload->route, route_path->ins); + + if (route_path->route != payload->route || !route_path->ins) { + flb_info("[router] route_payload_apply_outputs: skipping route_path (route mismatch or no ins)"); + continue; + } + + flb_info("[router] route_payload_apply_outputs: setting bit for output id=%d, name='%s'", + route_path->ins->id, route_path->ins->name); + + flb_routes_mask_set_bit(chunk->routes_mask, + route_path->ins->id, + ins->config); + routes_found++; + } + + flb_info("[router] route_payload_apply_outputs: found %d matching routes", routes_found); + + // Print the routes mask for debugging + flb_info("[router] route_payload_apply_outputs: routes_mask contents:"); + for (int i = 0; i < ins->config->route_mask_size; i++) { + flb_info("[router] routes_mask[%d] = 0x%08x", i, chunk->routes_mask[i]); + } + + if (flb_routes_mask_is_empty(chunk->routes_mask, ins->config) == FLB_TRUE) { + flb_info("[router] route_payload_apply_outputs: routes_mask is empty, returning -1"); + return -1; + } + + flb_info("[router] route_payload_apply_outputs: success"); + return 0; +} + static int encode_empty_map(char **out_buf, size_t *out_size) { char *buf; @@ -266,11 +345,20 @@ static int build_payload_for_route(struct flb_route_payload *payload, return -1; } + flb_info("[router] build_payload_for_route called for route '%s' with %zu records", + payload->route->name, record_count); + + flb_info("[router] route condition: %p, per_record_routing: %d", + payload->route->condition, payload->route->per_record_routing); + compiled = flb_router_route_get_condition(payload->route); if (!compiled) { + flb_info("[router] no compiled condition found for route '%s'", payload->route->name); return 0; } + flb_info("[router] compiled condition found for route '%s'", payload->route->name); + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); if (!encoder) { return -1; @@ -459,16 +547,26 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, mk_list_init(&payloads); + flb_info("[router] scanning %zu direct routes", mk_list_size(&ins->routes_direct)); mk_list_foreach(head, &ins->routes_direct) { route_path = mk_list_entry(head, struct flb_router_path, _head); + flb_info("[router] checking route_path: route=%p, condition=%p, per_record_routing=%d", + route_path->route, + route_path->route ? route_path->route->condition : NULL, + route_path->route ? route_path->route->per_record_routing : -1); + if (!route_path->route || (!route_path->route->condition && !route_path->route->per_record_routing)) { + flb_info("[router] skipping route_path (no route, condition, or per_record_routing)"); continue; } + flb_info("[router] processing route '%s'", route_path->route->name); + payload = route_payload_find(&payloads, route_path->route); if (!payload) { + flb_info("[router] creating new payload for route '%s'", route_path->route->name); payload = flb_calloc(1, sizeof(struct flb_route_payload)); if (!payload) { flb_errno(); @@ -477,7 +575,9 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, } payload->route = route_path->route; - payload->is_default = route_path->route->condition->is_default; + payload->is_default = route_path->route->condition ? route_path->route->condition->is_default : 0; + flb_info("[router] payload created: route='%s', is_default=%d", + payload->route->name, payload->is_default); mk_list_add(&payload->_head, &payloads); } @@ -487,11 +587,14 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, } } - if (mk_list_is_empty(&payloads)) { + flb_info("[router] created %zu payloads", mk_list_size(&payloads)); + if (mk_list_size(&payloads) == 0) { + flb_info("[router] no payloads created, returning 0"); return 0; } handled = FLB_TRUE; + flb_info("[router] payloads created, proceeding with processing"); if (!base_tag) { if (ins->tag && ins->tag_len > 0) { @@ -563,17 +666,24 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, cfl_list_entry(head, struct flb_mp_chunk_record, _head); } + flb_info("[router] processing %zu payloads", mk_list_size(&payloads)); mk_list_foreach(head, &payloads) { payload = mk_list_entry(head, struct flb_route_payload, _head); + flb_info("[router] processing payload for route '%s', is_default=%d, total_records=%zu", + payload->route->name, payload->is_default, payload->total_records); + if (payload->is_default) { + flb_info("[router] skipping default route '%s'", payload->route->name); continue; } + flb_info("[router] calling build_payload_for_route for route '%s'", payload->route->name); ret = build_payload_for_route(payload, records_array, record_count, matched_non_default); + flb_info("[router] build_payload_for_route returned %d for route '%s'", ret, payload->route->name); if (ret != 0) { flb_free(records_array); flb_free(matched_non_default); @@ -634,6 +744,13 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return -1; } + if (route_payload_apply_outputs(ins, payload) != 0) { + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + appended++; } diff --git a/tests/internal/conditionals.c b/tests/internal/conditionals.c index 81df2059c1b..7ce49021fdc 100644 --- a/tests/internal/conditionals.c +++ b/tests/internal/conditionals.c @@ -876,6 +876,22 @@ void test_condition_missing_values() flb_condition_destroy(cond); destroy_test_record(record_data); + /* Test metadata rule when metadata is absent */ + record_data = create_test_record("message", "test log entry"); + TEST_CHECK(record_data != NULL); + + cond = flb_condition_create(FLB_COND_OP_AND); + TEST_CHECK(cond != NULL); + + TEST_CHECK(flb_condition_add_rule(cond, "$stream", FLB_RULE_OP_EQ, + "production", 0, RECORD_CONTEXT_METADATA) == FLB_TRUE); + + result = flb_condition_evaluate(cond, &record_data->chunk); + TEST_CHECK(result == FLB_FALSE); /* Missing metadata should return false */ + + flb_condition_destroy(cond); + destroy_test_record(record_data); + /* Test NOT_IN operator with present field not in array */ record_data = create_test_record("level", "info"); TEST_CHECK(record_data != NULL); @@ -1817,4 +1833,4 @@ TEST_LIST = { {"gte_lte_multiple", test_condition_gte_lte_multiple}, {"gte_lte_border_cases", test_condition_gte_lte_border_cases}, {NULL, NULL} -}; \ No newline at end of file +}; From e662be2aedcfe4c8ef879c8d5ad34e2f2965b5a5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 21 Oct 2025 11:56:48 -0600 Subject: [PATCH 10/30] wip Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 36 ++++++++++++++++++++++++++++-------- src/flb_task.c | 11 +++++++++++ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 03e7a4bc065..7f6af7c8652 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -367,10 +367,19 @@ static int build_payload_for_route(struct flb_route_payload *payload, matched = 0; for (i = 0; i < record_count; i++) { - if (flb_condition_evaluate(compiled, records[i]) != FLB_TRUE) { + int condition_result = flb_condition_evaluate(compiled, records[i]); + flb_info("[router] route '%s' record %zu: condition_result=%d", + payload->route->name, i, condition_result); + + if (condition_result != FLB_TRUE) { + flb_info("[router] route '%s' record %zu: condition failed, skipping", + payload->route->name, i); continue; } + flb_info("[router] route '%s' record %zu: condition matched, adding to payload", + payload->route->name, i); + ret = encode_chunk_record(encoder, records[i]); if (ret != 0) { flb_log_event_encoder_destroy(encoder); @@ -426,15 +435,22 @@ static int build_payload_for_default_route(struct flb_route_payload *payload, matched = 0; for (i = 0; i < record_count; i++) { + flb_info("[router] default route record %zu: matched_non_default[%zu]=%d", + i, i, matched_non_default[i]); + if (matched_non_default[i]) { + flb_info("[router] default route record %zu: already matched, skipping", i); continue; } if (compiled && flb_condition_evaluate(compiled, records[i]) != FLB_TRUE) { + flb_info("[router] default route record %zu: condition failed, skipping", i); continue; } + flb_info("[router] default route record %zu: adding to default payload", i); + ret = encode_chunk_record(encoder, records[i]); if (ret != 0) { flb_log_event_encoder_destroy(encoder); @@ -576,14 +592,18 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, payload->route = route_path->route; payload->is_default = route_path->route->condition ? route_path->route->condition->is_default : 0; - flb_info("[router] payload created: route='%s', is_default=%d", - payload->route->name, payload->is_default); - mk_list_add(&payload->_head, &payloads); - } - if (append_output_to_payload_tag(payload, route_path->ins) != 0) { - route_payload_list_destroy(&payloads); - return -1; + /* Use the route name as the tag */ + payload->tag = flb_sds_create(route_path->route->name); + if (!payload->tag) { + flb_free(payload); + route_payload_list_destroy(&payloads); + return -1; + } + + flb_info("[router] payload created: route='%s', is_default=%d, tag='%s'", + payload->route->name, payload->is_default, payload->tag); + mk_list_add(&payload->_head, &payloads); } } diff --git a/src/flb_task.c b/src/flb_task.c index c77267bcd32..adbf806a02b 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -440,6 +440,17 @@ struct flb_task *flb_task_create(uint64_t ref_id, o_ins = route_path->ins; + if (task_ic->routes_mask && + flb_routes_mask_get_bit(task_ic->routes_mask, + o_ins->id, + o_ins->config) == 0) { + flb_debug("[task] skipping direct route for task=%p id=%i output=%s", + task, + task->id, + flb_output_name(o_ins)); + continue; + } + route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno(); From 38b010a6170ba2a50bda121eeab94f3e49c28a0f Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 Oct 2025 21:53:49 -0600 Subject: [PATCH 11/30] router: use cfl_list for router path Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_router.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index 06df3868b1b..8b34e0af160 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -43,7 +43,7 @@ struct flb_router_chunk_context { struct flb_router_path { struct flb_output_instance *ins; struct flb_route *route; - struct mk_list _head; + struct cfl_list _head; }; static inline int flb_router_match_type(int in_event_type, From 966387cd53bdff6d7cc5febc4636d6389359d5c5 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 Oct 2025 21:54:18 -0600 Subject: [PATCH 12/30] input_log: code cleanup and fix types for linked lists Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 226 +++++++++++++------------------------------- 1 file changed, 68 insertions(+), 158 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 7f6af7c8652..2a72e43c3e2 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -46,7 +46,7 @@ struct flb_route_payload { char *data; size_t size; size_t total_records; - struct mk_list _head; + struct cfl_list _head; }; static void route_payload_destroy(struct flb_route_payload *payload) @@ -55,8 +55,8 @@ static void route_payload_destroy(struct flb_route_payload *payload) return; } - if (!mk_list_entry_is_orphan(&payload->_head)) { - mk_list_del(&payload->_head); + if (!cfl_list_entry_is_orphan(&payload->_head)) { + cfl_list_del(&payload->_head); } if (payload->tag) { @@ -70,18 +70,18 @@ static void route_payload_destroy(struct flb_route_payload *payload) flb_free(payload); } -static struct flb_route_payload *route_payload_find(struct mk_list *payloads, +static struct flb_route_payload *route_payload_find(struct cfl_list *payloads, struct flb_route *route) { - struct mk_list *head; + struct cfl_list *head; struct flb_route_payload *payload; if (!payloads || !route) { return NULL; } - mk_list_foreach(head, payloads) { - payload = mk_list_entry(head, struct flb_route_payload, _head); + cfl_list_foreach(head, payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); if (payload->route == route) { return payload; @@ -91,67 +91,21 @@ static struct flb_route_payload *route_payload_find(struct mk_list *payloads, return NULL; } -static int append_output_to_payload_tag(struct flb_route_payload *payload, - struct flb_output_instance *out) -{ - const char *identifier; - - if (!payload || !out) { - return -1; - } - - if (out->alias) { - identifier = out->alias; - } - else { - identifier = flb_output_name(out); - } - - if (!identifier) { - return -1; - } - - if (!payload->tag) { - payload->tag = flb_sds_create(identifier); - if (!payload->tag) { - return -1; - } - } - else { - payload->tag = flb_sds_cat(payload->tag, ",", 1); - if (!payload->tag) { - return -1; - } - - payload->tag = flb_sds_cat(payload->tag, identifier, strlen(identifier)); - if (!payload->tag) { - return -1; - } - } - - return 0; -} - static int route_payload_apply_outputs(struct flb_input_instance *ins, struct flb_route_payload *payload) { int ret; + int routes_found = 0; size_t out_size = 0; + struct cfl_list *head; struct flb_input_chunk *chunk = NULL; - struct mk_list *head; struct flb_router_path *route_path; - int routes_found = 0; if (!ins || !payload || !payload->tag || !payload->route) { - flb_info("[router] route_payload_apply_outputs: invalid parameters"); return -1; } - flb_info("[router] route_payload_apply_outputs: processing route '%s' with tag '%s'", - payload->route->name, payload->tag); - if (!ins->ht_log_chunks || !ins->config) { - flb_info("[router] route_payload_apply_outputs: missing ht_log_chunks or config"); return -1; } @@ -161,51 +115,25 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, (void **) &chunk, &out_size); if (ret == -1 || !chunk || !chunk->routes_mask) { - flb_info("[router] route_payload_apply_outputs: failed to get chunk or routes_mask"); return -1; } - flb_info("[router] route_payload_apply_outputs: found chunk, clearing routes_mask"); - memset(chunk->routes_mask, 0, - sizeof(flb_route_mask_element) * ins->config->route_mask_size); - - flb_info("[router] route_payload_apply_outputs: scanning %zu direct routes", - mk_list_size(&ins->routes_direct)); - - mk_list_foreach(head, &ins->routes_direct) { - route_path = mk_list_entry(head, struct flb_router_path, _head); - - flb_info("[router] route_payload_apply_outputs: checking route_path->route=%p, payload->route=%p, route_path->ins=%p", - route_path->route, payload->route, route_path->ins); - + memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size); + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); if (route_path->route != payload->route || !route_path->ins) { - flb_info("[router] route_payload_apply_outputs: skipping route_path (route mismatch or no ins)"); continue; } - - flb_info("[router] route_payload_apply_outputs: setting bit for output id=%d, name='%s'", - route_path->ins->id, route_path->ins->name); - flb_routes_mask_set_bit(chunk->routes_mask, route_path->ins->id, ins->config); routes_found++; } - flb_info("[router] route_payload_apply_outputs: found %d matching routes", routes_found); - - // Print the routes mask for debugging - flb_info("[router] route_payload_apply_outputs: routes_mask contents:"); - for (int i = 0; i < ins->config->route_mask_size; i++) { - flb_info("[router] routes_mask[%d] = 0x%08x", i, chunk->routes_mask[i]); - } - if (flb_routes_mask_is_empty(chunk->routes_mask, ins->config) == FLB_TRUE) { - flb_info("[router] route_payload_apply_outputs: routes_mask is empty, returning -1"); return -1; } - flb_info("[router] route_payload_apply_outputs: success"); return 0; } @@ -481,36 +409,34 @@ static int build_payload_for_default_route(struct flb_route_payload *payload, return 0; } -static void route_payload_list_destroy(struct mk_list *payloads) +static void route_payload_list_destroy(struct cfl_list *payloads) { - struct mk_list *head; - struct mk_list *tmp; + struct cfl_list *head; + struct cfl_list *tmp; struct flb_route_payload *payload; if (!payloads) { return; } - mk_list_foreach_safe(head, tmp, payloads) { - payload = mk_list_entry(head, struct flb_route_payload, _head); + cfl_list_foreach_safe(head, tmp, payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); route_payload_destroy(payload); } } static int input_has_conditional_routes(struct flb_input_instance *ins) { - struct mk_list *head; + struct cfl_list *head; struct flb_router_path *route_path; if (!ins) { return FLB_FALSE; } - mk_list_foreach(head, &ins->routes_direct) { - route_path = mk_list_entry(head, struct flb_router_path, _head); - - if (route_path->route && - (route_path->route->condition || route_path->route->per_record_routing)) { + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (route_path->route && (route_path->route->condition || route_path->route->per_record_routing)) { return FLB_TRUE; } } @@ -529,19 +455,22 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, int appended; int handled; int context_initialized = FLB_FALSE; - struct mk_list payloads; - struct mk_list *head; - struct mk_list *tmp; + size_t out_size = 0; + uint8_t *matched_non_default = NULL; + struct cfl_list payloads; + struct cfl_list *head; + struct cfl_list *tmp; struct flb_router_path *route_path; struct flb_route_payload *payload; struct flb_router_chunk_context context; struct flb_event_chunk *chunk; struct flb_mp_chunk_record **records_array = NULL; - uint8_t *matched_non_default = NULL; + struct flb_input_chunk *orphaned_chunk = NULL; + size_t record_count; size_t index; - const char *base_tag = tag; size_t base_tag_len = tag_len; + const char *base_tag = tag; handled = FLB_FALSE; @@ -549,40 +478,24 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return 0; } - if (mk_list_size(&ins->routes_direct) == 0) { - flb_info("[router] no direct routes found"); + if (cfl_list_size(&ins->routes_direct) == 0) { return 0; } if (input_has_conditional_routes(ins) == FLB_FALSE) { - flb_info("[router] no conditional routes found"); return 0; } - flb_info("[router] conditional routing triggered for %zu routes", mk_list_size(&ins->routes_direct)); - - mk_list_init(&payloads); - - flb_info("[router] scanning %zu direct routes", mk_list_size(&ins->routes_direct)); - mk_list_foreach(head, &ins->routes_direct) { - route_path = mk_list_entry(head, struct flb_router_path, _head); - - flb_info("[router] checking route_path: route=%p, condition=%p, per_record_routing=%d", - route_path->route, - route_path->route ? route_path->route->condition : NULL, - route_path->route ? route_path->route->per_record_routing : -1); - + cfl_list_init(&payloads); + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); if (!route_path->route || (!route_path->route->condition && !route_path->route->per_record_routing)) { - flb_info("[router] skipping route_path (no route, condition, or per_record_routing)"); continue; } - flb_info("[router] processing route '%s'", route_path->route->name); - payload = route_payload_find(&payloads, route_path->route); if (!payload) { - flb_info("[router] creating new payload for route '%s'", route_path->route->name); payload = flb_calloc(1, sizeof(struct flb_route_payload)); if (!payload) { flb_errno(); @@ -600,22 +513,15 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, route_payload_list_destroy(&payloads); return -1; } - - flb_info("[router] payload created: route='%s', is_default=%d, tag='%s'", - payload->route->name, payload->is_default, payload->tag); - mk_list_add(&payload->_head, &payloads); + cfl_list_add(&payload->_head, &payloads); } } - flb_info("[router] created %zu payloads", mk_list_size(&payloads)); - if (mk_list_size(&payloads) == 0) { - flb_info("[router] no payloads created, returning 0"); + if (cfl_list_size(&payloads) == 0) { return 0; } handled = FLB_TRUE; - flb_info("[router] payloads created, proceeding with processing"); - if (!base_tag) { if (ins->tag && ins->tag_len > 0) { base_tag = ins->tag; @@ -665,6 +571,14 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, records_array = flb_calloc(record_count, sizeof(struct flb_mp_chunk_record *)); + if (!records_array) { + flb_errno(); + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + matched_non_default = flb_calloc(record_count, sizeof(uint8_t)); if (!records_array || !matched_non_default) { flb_errno(); @@ -682,28 +596,19 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, index = 0; cfl_list_foreach(head, &context.chunk_cobj->records) { - records_array[index++] = - cfl_list_entry(head, struct flb_mp_chunk_record, _head); + records_array[index++] = cfl_list_entry(head, struct flb_mp_chunk_record, _head); } - flb_info("[router] processing %zu payloads", mk_list_size(&payloads)); - mk_list_foreach(head, &payloads) { - payload = mk_list_entry(head, struct flb_route_payload, _head); - - flb_info("[router] processing payload for route '%s', is_default=%d, total_records=%zu", - payload->route->name, payload->is_default, payload->total_records); - + cfl_list_foreach(head, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); if (payload->is_default) { - flb_info("[router] skipping default route '%s'", payload->route->name); continue; } - flb_info("[router] calling build_payload_for_route for route '%s'", payload->route->name); ret = build_payload_for_route(payload, records_array, record_count, matched_non_default); - flb_info("[router] build_payload_for_route returned %d for route '%s'", ret, payload->route->name); if (ret != 0) { flb_free(records_array); flb_free(matched_non_default); @@ -714,9 +619,8 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, } } - mk_list_foreach(head, &payloads) { - payload = mk_list_entry(head, struct flb_route_payload, _head); - + cfl_list_foreach(head, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); if (!payload->is_default) { continue; } @@ -738,8 +642,8 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, flb_free(records_array); flb_free(matched_non_default); - mk_list_foreach_safe(head, tmp, &payloads) { - payload = mk_list_entry(head, struct flb_route_payload, _head); + cfl_list_foreach_safe(head, tmp, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); if (payload->total_records <= 0 || !payload->data) { route_payload_destroy(payload); @@ -747,8 +651,8 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, } appended = 0; - mk_list_foreach(head, &payloads) { - payload = mk_list_entry(head, struct flb_route_payload, _head); + cfl_list_foreach(head, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, @@ -765,6 +669,22 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, } if (route_payload_apply_outputs(ins, payload) != 0) { + /* Clean up the orphaned chunk from ht_log_chunks */ + orphaned_chunk = NULL; + out_size = 0; + ret = flb_hash_table_get(ins->ht_log_chunks, + payload->tag, + flb_sds_len(payload->tag), + (void **) &orphaned_chunk, + &out_size); + if (ret >= 0 && orphaned_chunk) { + flb_hash_table_del_ptr(ins->ht_log_chunks, + payload->tag, + flb_sds_len(payload->tag), + (void *) orphaned_chunk); + /* Destroy the orphaned chunk completely */ + flb_input_chunk_destroy(orphaned_chunk, FLB_TRUE); + } flb_router_chunk_context_destroy(&context); route_payload_list_destroy(&payloads); flb_event_chunk_destroy(chunk); @@ -889,16 +809,6 @@ int flb_input_log_append_records(struct flb_input_instance *ins, const void *buf, size_t buf_size) { int ret; - char tag_copy[128]; - - strncpy(tag_copy, tag, sizeof(tag_copy)); - tag_copy[sizeof(tag_copy) - 1] = '\0'; - - printf("--------------------------------\n"); - printf("appending records: tag: %s, tag_len: %zu, records: %zu\n", tag_copy, tag_len, records); - flb_pack_print(buf, buf_size); - printf("--------------------------------\n"); - printf("\n"); ret = input_log_append(ins, 0, records, tag, tag_len, buf, buf_size); return ret; From b923d4ad0cddef9e62c44d9f9f5f9a62d859cd3c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 Oct 2025 21:55:06 -0600 Subject: [PATCH 13/30] input: fix data type for routes_direct Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_input.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index accde0b910b..97f1194ab49 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -335,10 +335,10 @@ struct flb_input_instance { struct mk_list _head; /* link to config->inputs */ - struct mk_list routes_direct; /* direct routes set by API */ - struct mk_list routes; /* flb_router_path's list */ - struct mk_list properties; /* properties / configuration */ - struct mk_list collectors; /* collectors */ + struct cfl_list routes_direct; /* direct routes set by API */ + struct mk_list routes; /* flb_router_path's list */ + struct mk_list properties; /* properties / configuration */ + struct mk_list collectors; /* collectors */ /* Storage Chunks */ struct mk_list chunks; /* linked list of all chunks */ From 416371253e2e8c7af646436e488f34321aa66c53 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 Oct 2025 21:55:36 -0600 Subject: [PATCH 14/30] router_config: fix data type for linked list Signed-off-by: Eduardo Silva --- src/flb_router_config.c | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/flb_router_config.c b/src/flb_router_config.c index b3ae699b4c0..098b79018c5 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -1207,15 +1207,15 @@ static struct flb_output_instance *find_output_instance(struct flb_config *confi static int input_has_direct_route(struct flb_input_instance *in, struct flb_output_instance *out) { - struct mk_list *head; + struct cfl_list *head; struct flb_router_path *path; if (!in || !out) { return FLB_FALSE; } - mk_list_foreach(head, &in->routes_direct) { - path = mk_list_entry(head, struct flb_router_path, _head); + cfl_list_foreach(head, &in->routes_direct) { + path = cfl_list_entry(head, struct flb_router_path, _head); if (path->ins == out) { return FLB_TRUE; } @@ -1251,6 +1251,7 @@ static int output_supports_signals(struct flb_output_instance *out, uint32_t sig int flb_router_apply_config(struct flb_config *config) { + int created = 0; struct cfl_list *input_head; struct cfl_list *route_head; struct cfl_list *output_head; @@ -1260,15 +1261,12 @@ int flb_router_apply_config(struct flb_config *config) struct flb_input_instance *input_ins; struct flb_output_instance *output_ins; struct flb_output_instance *fallback_ins; - int created; + struct flb_router_path *path; if (!config) { return 0; } - flb_debug("[router] applying router configuration"); - created = 0; - cfl_list_foreach(input_head, &config->input_routes) { input_routes = cfl_list_entry(input_head, struct flb_input_routes, _head); @@ -1318,10 +1316,7 @@ int flb_router_apply_config(struct flb_config *config) } if (flb_router_connect_direct(input_ins, output_ins) == 0) { - struct flb_router_path *path; - - path = mk_list_entry_last(&input_ins->routes_direct, - struct flb_router_path, _head); + path = cfl_list_entry_last(&input_ins->routes_direct, struct flb_router_path, _head); path->route = route; created++; flb_debug("[router] connected input '%s' route '%s' to output '%s'", From 55f9e0e56bcba58e40874c6d06746b024d0395ba Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 Oct 2025 21:55:51 -0600 Subject: [PATCH 15/30] task: fix data type for linked list Signed-off-by: Eduardo Silva --- src/flb_task.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/flb_task.c b/src/flb_task.c index adbf806a02b..ca5e18d3289 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -367,7 +367,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, struct flb_router_path *route_path; struct flb_output_instance *o_ins; struct flb_input_chunk *task_ic; - struct mk_list *i_head; + struct cfl_list *i_head; struct mk_list *o_head; struct flb_router_chunk_context router_context; int router_context_initialized = FLB_FALSE; @@ -426,11 +426,11 @@ struct flb_task *flb_task_create(uint64_t ref_id, #endif /* Direct connects betweek input <> outputs (API based) */ - if (mk_list_size(&i_ins->routes_direct) > 0) { + if (cfl_list_size(&i_ins->routes_direct) > 0) { direct_count = 0; - mk_list_foreach(i_head, &i_ins->routes_direct) { - route_path = mk_list_entry(i_head, struct flb_router_path, _head); + cfl_list_foreach(i_head, &i_ins->routes_direct) { + route_path = cfl_list_entry(i_head, struct flb_router_path, _head); if (flb_router_path_should_route(task->event_chunk, &router_context, From 2c2a5cfd133cf8dcb4fc38403cb8bfe74a9dfe86 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 26 Oct 2025 22:14:35 -0600 Subject: [PATCH 16/30] input_log: preserve non-conditional routes when conditional routing is used Remove early return in input_log_append() that prevented non-conditional routes from receiving data when conditional routes existed. This ensures both conditional and non-conditional routes receive data in mixed configurations, preventing silent data loss from unconditional outputs. Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 2a72e43c3e2..15208ed9a79 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -756,13 +756,11 @@ static int input_log_append(struct flb_input_instance *ins, return -1; } - if (ret > 0) { - if (processor_is_active && buf != out_buf) { - flb_free(out_buf); - } - return 0; - } - + /* + * Always call flb_input_chunk_append_raw to ensure non-conditional routes + * receive data even when conditional routes exist. The conditional routing + * should be additive, not exclusive. + */ ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, tag, tag_len, out_buf, out_size); From 20eeb44a464b0067a8fad570aa19343f369efad8 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 07:05:21 -0600 Subject: [PATCH 17/30] input_log: handle deferred chunk creation in threaded inputs with conditional routing Modify route_payload_apply_outputs() to gracefully handle the case where chunks don't exist yet in threaded mode due to asynchronous ring buffer processing. This prevents data loss for threaded inputs using conditional routing by falling back to normal routing instead of treating missing chunks as fatal errors. Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 15208ed9a79..d59ad8e5bcc 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -115,7 +115,14 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, (void **) &chunk, &out_size); if (ret == -1 || !chunk || !chunk->routes_mask) { - return -1; + /* For threaded inputs, chunk may not exist yet - this is expected */ + if (flb_input_is_threaded(ins)) { + /* In threaded mode, routing will be handled when chunk is materialized */ + flb_plg_debug(ins, "chunk not yet materialized for threaded input, " + "routing will be handled asynchronously"); + return 0; /* Success - don't treat as error */ + } + return -1; /* Error for non-threaded inputs */ } memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size); From 49e41fe23482cd5a3f6171bff9b32c267be7f6e6 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 07:21:32 -0600 Subject: [PATCH 18/30] input: include cfl header Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_input.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 97f1194ab49..fc01d0e9dd4 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -52,6 +52,8 @@ #include #include +#include + #include #include @@ -336,7 +338,7 @@ struct flb_input_instance { struct mk_list _head; /* link to config->inputs */ struct cfl_list routes_direct; /* direct routes set by API */ - struct mk_list routes; /* flb_router_path's list */ + struct cfl_list routes; /* flb_router_path's list */ struct mk_list properties; /* properties / configuration */ struct mk_list collectors; /* collectors */ From b727628b444ebd28403694ec86b4fba27c7a9fad Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 07:21:58 -0600 Subject: [PATCH 19/30] router: use cfl_list intead of mk_list Signed-off-by: Eduardo Silva --- src/flb_router.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/flb_router.c b/src/flb_router.c index 0c8f749d835..47251d52023 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -156,7 +156,7 @@ int flb_router_connect(struct flb_input_instance *in, p->ins = out; p->route = NULL; - mk_list_add(&p->_head, &in->routes); + cfl_list_add(&p->_head, &in->routes); return 0; } @@ -174,7 +174,7 @@ int flb_router_connect_direct(struct flb_input_instance *in, p->ins = out; p->route = NULL; - mk_list_add(&p->_head, &in->routes_direct); + cfl_list_add(&p->_head, &in->routes_direct); return 0; } @@ -273,9 +273,9 @@ int flb_router_io_set(struct flb_config *config) void flb_router_exit(struct flb_config *config) { struct mk_list *tmp; - struct mk_list *r_tmp; + struct cfl_list *r_tmp; struct mk_list *head; - struct mk_list *r_head; + struct cfl_list *r_head; struct flb_input_instance *in; struct flb_router_path *r; @@ -284,16 +284,16 @@ void flb_router_exit(struct flb_config *config) in = mk_list_entry(head, struct flb_input_instance, _head); /* Iterate instance routes */ - mk_list_foreach_safe(r_head, r_tmp, &in->routes) { - r = mk_list_entry(r_head, struct flb_router_path, _head); - mk_list_del(&r->_head); + cfl_list_foreach_safe(r_head, r_tmp, &in->routes) { + r = cfl_list_entry(r_head, struct flb_router_path, _head); + cfl_list_del(&r->_head); flb_free(r); } /* Iterate instance routes direct */ - mk_list_foreach_safe(r_head, r_tmp, &in->routes_direct) { - r = mk_list_entry(r_head, struct flb_router_path, _head); - mk_list_del(&r->_head); + cfl_list_foreach_safe(r_head, r_tmp, &in->routes_direct) { + r = cfl_list_entry(r_head, struct flb_router_path, _head); + cfl_list_del(&r->_head); flb_free(r); } } From 28946e86a1ebc1e71b602d22bcb8d1553248db32 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 07:41:32 -0600 Subject: [PATCH 20/30] input: fix llist initialization Signed-off-by: Eduardo Silva --- src/flb_input.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/flb_input.c b/src/flb_input.c index 1d380b6ed7b..97ebb1e4ad9 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -352,8 +352,8 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->host.ipv6 = FLB_FALSE; /* Initialize list heads */ - mk_list_init(&instance->routes_direct); - mk_list_init(&instance->routes); + cfl_list_init(&instance->routes_direct); + cfl_list_init(&instance->routes); mk_list_init(&instance->tasks); mk_list_init(&instance->chunks); mk_list_init(&instance->collectors); From 4a82d5d7cbcefb2c541048afa57127c4ad85b3c4 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 07:41:50 -0600 Subject: [PATCH 21/30] sosreport: fix list iterator api Signed-off-by: Eduardo Silva --- src/flb_sosreport.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/flb_sosreport.c b/src/flb_sosreport.c index 3db87bd1336..5074aa5aeb8 100644 --- a/src/flb_sosreport.c +++ b/src/flb_sosreport.c @@ -167,7 +167,7 @@ int flb_sosreport(struct flb_config *config) { char tmp[32]; struct mk_list *head; - struct mk_list *head_r; + struct cfl_list *head_r; struct flb_input_plugin *in; struct flb_filter_plugin *filter; struct flb_output_plugin *out; @@ -266,10 +266,10 @@ int flb_sosreport(struct flb_config *config) print_properties(&ins_in->properties); /* Fixed Routes */ - if (mk_list_is_empty(&ins_in->routes) != 0) { + if (!cfl_list_is_empty(&ins_in->routes)) { printf(" Routes\t\t"); - mk_list_foreach(head_r, &ins_in->routes) { - route = mk_list_entry(head_r, struct flb_router_path, _head); + cfl_list_foreach(head_r, &ins_in->routes) { + route = cfl_list_entry(head_r, struct flb_router_path, _head); printf("%s ", route->ins->name); } printf("\n"); From cb67d1c258632e8fb6fb52b902f3f0b78223dce4 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 07:42:07 -0600 Subject: [PATCH 22/30] tests: internal: router_config: fix lists API Signed-off-by: Eduardo Silva --- tests/internal/router_config.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c index 42c19ec807e..9f4c888b3d1 100644 --- a/tests/internal/router_config.c +++ b/tests/internal/router_config.c @@ -774,8 +774,8 @@ static void setup_test_instances(struct flb_config *config, memset(input, 0, sizeof(struct flb_input_instance)); mk_list_init(&input->_head); - mk_list_init(&input->routes_direct); - mk_list_init(&input->routes); + cfl_list_init(&input->routes_direct); + cfl_list_init(&input->routes); mk_list_init(&input->tasks); mk_list_init(&input->chunks); mk_list_init(&input->collectors); @@ -843,9 +843,9 @@ void test_router_apply_config_success() cfl_list_add(&route_output._head, &route.outputs); TEST_CHECK(flb_router_apply_config(&config) == 0); - TEST_CHECK(mk_list_size(&input.routes_direct) == 1); + TEST_CHECK(cfl_list_size(&input.routes_direct) == 1); - path = mk_list_entry(input.routes_direct.next, struct flb_router_path, _head); + path = cfl_list_entry(input.routes_direct.next, struct flb_router_path, _head); TEST_CHECK(path->ins == &output); flb_router_exit(&config); @@ -891,8 +891,8 @@ void test_router_apply_config_missing_output() TEST_CHECK(flb_router_apply_config(&config) == 0); - /* note mk_list_is_empty return 0 if is empty */ - TEST_CHECK(!mk_list_is_empty(&input.routes_direct)); + /* When output is missing, no routes should be created */ + TEST_CHECK(cfl_list_is_empty(&input.routes_direct)); flb_router_exit(&config); From c7949cc5d52801011288c3d0c74b2faad4868b3d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 21:27:28 -0600 Subject: [PATCH 23/30] input_chunk: expose flb_input_chunk_get_real_size() Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_input_chunk.h | 1 + src/flb_input_chunk.c | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 696e40ac4ba..b9b55abe28d 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -113,6 +113,7 @@ int flb_input_chunk_get_tag(struct flb_input_chunk *ic, void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins); void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data); ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic); +ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); size_t flb_input_chunk_set_limits(struct flb_input_instance *in); size_t flb_input_chunk_total_size(struct flb_input_instance *in); struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index b8f431452c4..9f1c5af4e33 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -96,7 +96,6 @@ static int flb_input_chunk_drop_task_route( struct flb_output_instance *o_ins, ssize_t *dropped_record_count); -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk) { @@ -283,7 +282,7 @@ ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic) * is used to track the size of chunks in filesystem so we need to call * cio_chunk_get_real_size to return the original size in the file system */ -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) +ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) { ssize_t meta_size; ssize_t size; From ac76b2016b5d8cabae32a6d1a688e43ede2ca935 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 21:27:55 -0600 Subject: [PATCH 24/30] tests: internal: input_chunk: remove unused code Signed-off-by: Eduardo Silva --- tests/internal/input_chunk.c | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/tests/internal/input_chunk.c b/tests/internal/input_chunk.c index 945ddbbf13b..773eb03cf9c 100644 --- a/tests/internal/input_chunk.c +++ b/tests/internal/input_chunk.c @@ -309,42 +309,6 @@ void flb_test_input_chunk_dropping_chunks() flb_destroy(ctx); } -/* - * When chunk is set to DOWN from memory, data_size is set to 0 and - * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size - * is used to track the size of chunks in filesystem so we need to call - * cio_chunk_get_real_size to return the original size in the file system - */ -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) -{ - ssize_t meta_size; - ssize_t size; - - size = cio_chunk_get_real_size(ic->chunk); - - if (size != 0) { - return size; - } - - // Real size is not synced to chunk yet - size = flb_input_chunk_get_size(ic); - if (size == 0) { - flb_debug("[input chunk] no data in the chunk %s", - flb_input_chunk_get_name(ic)); - return -1; - } - - meta_size = cio_meta_size(ic->chunk); - size += meta_size - /* See https://github.com/edsiper/chunkio#file-layout for more details */ - + 2 /* HEADER BYTES */ - + 4 /* CRC32 */ - + 16 /* PADDING */ - + 2; /* METADATA LENGTH BYTES */ - - return size; -} - static int gen_buf(msgpack_sbuffer *mp_sbuf, char *buf, size_t buf_size) { msgpack_unpacked result; From a7e4c925f97fecb04834099df54a6e488587a586 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 21:30:18 -0600 Subject: [PATCH 25/30] input_log: implement conditional routing with non-conditional route preservation - Add input_chunk_remove_conditional_routes() to prevent duplicate routing - Preserve non-conditional routes when conditional routing is used - Fix base tag length calculation for proper chunk identification - Ensure conditional routing is additive rather than exclusive Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 134 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 123 insertions(+), 11 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index d59ad8e5bcc..8989c451499 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -19,11 +19,13 @@ #include "fluent-bit/flb_pack.h" #include +#include #include #include #include #include #include +#include #include #include #include @@ -36,6 +38,8 @@ #include #include +#include + #include #include @@ -97,6 +101,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, int ret; int routes_found = 0; size_t out_size = 0; + ssize_t chunk_size; struct cfl_list *head; struct flb_input_chunk *chunk = NULL; struct flb_router_path *route_path; @@ -115,14 +120,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, (void **) &chunk, &out_size); if (ret == -1 || !chunk || !chunk->routes_mask) { - /* For threaded inputs, chunk may not exist yet - this is expected */ - if (flb_input_is_threaded(ins)) { - /* In threaded mode, routing will be handled when chunk is materialized */ - flb_plg_debug(ins, "chunk not yet materialized for threaded input, " - "routing will be handled asynchronously"); - return 0; /* Success - don't treat as error */ - } - return -1; /* Error for non-threaded inputs */ + return -1; } memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size); @@ -141,6 +139,14 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, return -1; } + if (chunk->fs_counted == FLB_FALSE) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size > 0) { + flb_input_chunk_update_output_instances(chunk, + (size_t) chunk_size); + } + } + return 0; } @@ -432,6 +438,65 @@ static void route_payload_list_destroy(struct cfl_list *payloads) } } +static void input_chunk_remove_conditional_routes(struct flb_input_instance *ins, + struct flb_input_chunk *chunk) +{ + ssize_t chunk_size; + size_t chunk_size_sz; + struct cfl_list *head; + struct flb_router_path *route_path; + + if (!ins || !chunk || !chunk->routes_mask || !ins->config) { + return; + } + + chunk_size = -1; + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + + if (!route_path->route || !route_path->ins) { + continue; + } + + if (!route_path->route->condition && + !route_path->route->per_record_routing) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config) == 0) { + continue; + } + + flb_routes_mask_clear_bit(chunk->routes_mask, + route_path->ins->id, + ins->config); + + if (route_path->ins->total_limit_size == -1 || + chunk->fs_counted == FLB_FALSE) { + continue; + } + + if (chunk_size == -1) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size <= 0) { + chunk_size = 0; + } + } + + if (chunk_size > 0) { + chunk_size_sz = (size_t) chunk_size; + if (route_path->ins->fs_chunks_size > chunk_size_sz) { + route_path->ins->fs_chunks_size -= chunk_size_sz; + } + else { + route_path->ins->fs_chunks_size = 0; + } + } + } +} + static int input_has_conditional_routes(struct flb_input_instance *ins) { struct cfl_list *head; @@ -493,6 +558,13 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, return 0; } + /* Conditional routing not supported for threaded inputs */ + if (flb_input_is_threaded(ins)) { + flb_plg_warn(ins, "conditional routing not supported for threaded inputs, " + "falling back to normal routing"); + return 0; + } + cfl_list_init(&payloads); cfl_list_foreach(head, &ins->routes_direct) { route_path = cfl_list_entry(head, struct flb_router_path, _head); @@ -539,6 +611,9 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins, base_tag_len = strlen(ins->name); } } + else if (base_tag_len == 0) { + base_tag_len = strlen(base_tag); + } chunk = flb_event_chunk_create(FLB_EVENT_TYPE_LOGS, records, @@ -717,9 +792,15 @@ static int input_log_append(struct flb_input_instance *ins, const void *buf, size_t buf_size) { int ret; + int conditional_result; + int conditional_handled = FLB_FALSE; int processor_is_active; void *out_buf = (void *) buf; + size_t dummy = 0; size_t out_size = buf_size; + const char *base_tag = tag; + size_t base_tag_len = tag_len; + struct flb_input_chunk *chunk = NULL; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -754,15 +835,33 @@ static int input_log_append(struct flb_input_instance *ins, } } - ret = split_and_append_route_payloads(ins, records, tag, tag_len, - out_buf, out_size); - if (ret < 0) { + if (!base_tag) { + if (ins->tag && ins->tag_len > 0) { + base_tag = ins->tag; + base_tag_len = ins->tag_len; + } + else { + base_tag = ins->name; + base_tag_len = strlen(ins->name); + } + } + else if (base_tag_len == 0) { + base_tag_len = strlen(base_tag); + } + + conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len, + out_buf, out_size); + if (conditional_result < 0) { if (processor_is_active && buf != out_buf) { flb_free(out_buf); } return -1; } + if (conditional_result > 0) { + conditional_handled = FLB_TRUE; + } + /* * Always call flb_input_chunk_append_raw to ensure non-conditional routes * receive data even when conditional routes exist. The conditional routing @@ -771,6 +870,19 @@ static int input_log_append(struct flb_input_instance *ins, ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, tag, tag_len, out_buf, out_size); + if (ret == 0 && conditional_handled == FLB_TRUE && base_tag) { + chunk = NULL; + dummy = 0; + + if (flb_hash_table_get(ins->ht_log_chunks, + base_tag, + base_tag_len, + (void **) &chunk, + &dummy) >= 0 && chunk) { + input_chunk_remove_conditional_routes(ins, chunk); + } + } + if (processor_is_active && buf != out_buf) { flb_free(out_buf); } From 8183d93f4bf03776fb24a117403c728233c2805f Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 27 Oct 2025 21:46:53 -0600 Subject: [PATCH 26/30] input_log: recompute per-output storage accounting Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 47 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 8989c451499..8dbfb398f45 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -101,6 +101,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, int ret; int routes_found = 0; size_t out_size = 0; + size_t chunk_size_sz = 0; ssize_t chunk_size; struct cfl_list *head; struct flb_input_chunk *chunk = NULL; @@ -123,6 +124,52 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, return -1; } + if (chunk->fs_counted == FLB_TRUE) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size > 0) { + chunk_size_sz = (size_t) chunk_size; + } + else { + chunk_size = 0; + } + } + else { + chunk_size = 0; + } + + if (chunk_size_sz > 0) { + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + + if (!route_path->ins) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config) == 0) { + continue; + } + + if (route_path->route == payload->route) { + continue; + } + + if (route_path->ins->total_limit_size != -1) { + if (route_path->ins->fs_chunks_size > chunk_size_sz) { + route_path->ins->fs_chunks_size -= chunk_size_sz; + } + else { + route_path->ins->fs_chunks_size = 0; + } + } + + flb_routes_mask_clear_bit(chunk->routes_mask, + route_path->ins->id, + ins->config); + } + } + memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size); cfl_list_foreach(head, &ins->routes_direct) { route_path = cfl_list_entry(head, struct flb_router_path, _head); From fde4e06e2e514908b6b1fbe02e2e51bf7de9a71b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 30 Oct 2025 11:07:14 -0600 Subject: [PATCH 27/30] input_log: remove debug messages and cleanup Signed-off-by: Eduardo Silva --- src/flb_input_log.c | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 8dbfb398f45..ae06d079379 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -324,8 +324,9 @@ static int build_payload_for_route(struct flb_route_payload *payload, uint8_t *matched_non_default) { size_t i; - int matched; int ret; + int condition_result; + int matched; struct flb_condition *compiled; struct flb_log_event_encoder *encoder; @@ -333,20 +334,12 @@ static int build_payload_for_route(struct flb_route_payload *payload, return -1; } - flb_info("[router] build_payload_for_route called for route '%s' with %zu records", - payload->route->name, record_count); - - flb_info("[router] route condition: %p, per_record_routing: %d", - payload->route->condition, payload->route->per_record_routing); compiled = flb_router_route_get_condition(payload->route); if (!compiled) { - flb_info("[router] no compiled condition found for route '%s'", payload->route->name); return 0; } - flb_info("[router] compiled condition found for route '%s'", payload->route->name); - encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); if (!encoder) { return -1; @@ -355,19 +348,11 @@ static int build_payload_for_route(struct flb_route_payload *payload, matched = 0; for (i = 0; i < record_count; i++) { - int condition_result = flb_condition_evaluate(compiled, records[i]); - flb_info("[router] route '%s' record %zu: condition_result=%d", - payload->route->name, i, condition_result); - + condition_result = flb_condition_evaluate(compiled, records[i]); if (condition_result != FLB_TRUE) { - flb_info("[router] route '%s' record %zu: condition failed, skipping", - payload->route->name, i); continue; } - flb_info("[router] route '%s' record %zu: condition matched, adding to payload", - payload->route->name, i); - ret = encode_chunk_record(encoder, records[i]); if (ret != 0) { flb_log_event_encoder_destroy(encoder); @@ -423,22 +408,15 @@ static int build_payload_for_default_route(struct flb_route_payload *payload, matched = 0; for (i = 0; i < record_count; i++) { - flb_info("[router] default route record %zu: matched_non_default[%zu]=%d", - i, i, matched_non_default[i]); - if (matched_non_default[i]) { - flb_info("[router] default route record %zu: already matched, skipping", i); continue; } if (compiled && flb_condition_evaluate(compiled, records[i]) != FLB_TRUE) { - flb_info("[router] default route record %zu: condition failed, skipping", i); continue; } - flb_info("[router] default route record %zu: adding to default payload", i); - ret = encode_chunk_record(encoder, records[i]); if (ret != 0) { flb_log_event_encoder_destroy(encoder); From 8d2105f3a6bb5ccbb166bc71d5bc093a24287c67 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 30 Oct 2025 11:38:04 -0600 Subject: [PATCH 28/30] routes_mask: correct memcmp byte count in flb_routes_mask_is_empty Signed-off-by: Eduardo Silva --- src/flb_routes_mask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index e10ad9a463a..4ff295871a3 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -144,7 +144,7 @@ int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, { return memcmp(routes_mask, config->route_empty_mask, - config->route_mask_size) == 0; + config->route_mask_size * sizeof(flb_route_mask_element)) == 0; } int flb_routes_empty_mask_create(struct flb_config *config) From cb2ce79249997edce8c39b7026d8acd74708928a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 30 Oct 2025 12:00:59 -0600 Subject: [PATCH 29/30] router_config: remove debug message Signed-off-by: Eduardo Silva --- src/flb_router_config.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/flb_router_config.c b/src/flb_router_config.c index 098b79018c5..da2da77ba6f 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -898,7 +898,6 @@ static int parse_route(struct cfl_variant *variant, int val; if (variant_to_bool(per_record_var, &val) == 0) { route->per_record_routing = val; - flb_info("[router] route '%s' per_record_routing=%d", route->name, val); } } From 90bfb20d8f406ea7c1b65b39633a2d1c711d80da Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 30 Oct 2025 12:01:10 -0600 Subject: [PATCH 30/30] task: remove routes_mask check from direct-route branch Signed-off-by: Eduardo Silva --- src/flb_task.c | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/flb_task.c b/src/flb_task.c index ca5e18d3289..a829b73abf3 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -440,17 +440,6 @@ struct flb_task *flb_task_create(uint64_t ref_id, o_ins = route_path->ins; - if (task_ic->routes_mask && - flb_routes_mask_get_bit(task_ic->routes_mask, - o_ins->id, - o_ins->config) == 0) { - flb_debug("[task] skipping direct route for task=%p id=%i output=%s", - task, - task->id, - flb_output_name(o_ins)); - continue; - } - route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno();