Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
33cdbbe
router: implement new router paths and conditions
edsiper Oct 20, 2025
ac42f67
router_config: add support for rules
edsiper Oct 20, 2025
839a397
router_condition: add conditional logic for logs
edsiper Oct 20, 2025
3e147e1
task: add handling for direct route
edsiper Oct 20, 2025
a4ace4e
tests: internal: router_config: extend test for conditional logs
edsiper Oct 20, 2025
947c874
wip
edsiper Oct 20, 2025
840e2ba
wip
edsiper Oct 21, 2025
10058f2
wip
edsiper Oct 21, 2025
eb798a7
wip
edsiper Oct 21, 2025
e662be2
wip
edsiper Oct 21, 2025
38b010a
router: use cfl_list for router path
edsiper Oct 27, 2025
966387c
input_log: code cleanup and fix types for linked lists
edsiper Oct 27, 2025
b923d4a
input: fix data type for routes_direct
edsiper Oct 27, 2025
4163712
router_config: fix data type for linked list
edsiper Oct 27, 2025
55f9e0e
task: fix data type for linked list
edsiper Oct 27, 2025
2c2a5cf
input_log: preserve non-conditional routes when conditional routing i…
edsiper Oct 27, 2025
20eeb44
input_log: handle deferred chunk creation in threaded inputs with con…
edsiper Oct 27, 2025
49e41fe
input: include cfl header
edsiper Oct 27, 2025
b727628
router: use cfl_list intead of mk_list
edsiper Oct 27, 2025
28946e8
input: fix llist initialization
edsiper Oct 27, 2025
4a82d5d
sosreport: fix list iterator api
edsiper Oct 27, 2025
cb67d1c
tests: internal: router_config: fix lists API
edsiper Oct 27, 2025
c7949cc
input_chunk: expose flb_input_chunk_get_real_size()
edsiper Oct 28, 2025
ac76b20
tests: internal: input_chunk: remove unused code
edsiper Oct 28, 2025
a7e4c92
input_log: implement conditional routing with non-conditional route p…
edsiper Oct 28, 2025
8183d93
input_log: recompute per-output storage accounting
edsiper Oct 28, 2025
fde4e06
input_log: remove debug messages and cleanup
edsiper Oct 30, 2025
8d2105f
routes_mask: correct memcmp byte count in flb_routes_mask_is_empty
edsiper Oct 30, 2025
cb2ce79
router_config: remove debug message
edsiper Oct 30, 2025
90bfb20
task: remove routes_mask check from direct-route branch
edsiper Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_conditionals.h>
#include <cfl/cfl.h>
#include <monkey/mk_core.h>

struct flb_router_path {
struct flb_output_instance *ins;
struct flb_route *route;
struct mk_list _head;
};

Expand Down Expand Up @@ -74,12 +76,17 @@ struct flb_route_condition_rule {
flb_sds_t field;
flb_sds_t op;
flb_sds_t value;
flb_sds_t *values;
size_t values_count;
struct cfl_list _head;
};

struct flb_route_condition {
struct cfl_list rules;
int is_default;
enum flb_condition_operator op;
struct flb_condition *compiled;
int compiled_status;
};

struct flb_route_output {
Expand Down Expand Up @@ -136,6 +143,8 @@ int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
struct flb_route *route);
int flb_router_path_should_route(struct flb_event_chunk *chunk,
struct flb_router_path *path);

struct flb_cf;

Expand Down
2 changes: 2 additions & 0 deletions src/flb_router.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ int flb_router_connect(struct flb_input_instance *in,
}

p->ins = out;
p->route = NULL;
mk_list_add(&p->_head, &in->routes);

return 0;
Expand All @@ -172,6 +173,7 @@ int flb_router_connect_direct(struct flb_input_instance *in,
}

p->ins = out;
p->route = NULL;
mk_list_add(&p->_head, &in->routes_direct);

return 0;
Expand Down
287 changes: 270 additions & 17 deletions src/flb_router_condition.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_conditionals.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>

Comment on lines 20 to 27
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add missing standard headers for strcasecmp/strtod/errno

This file uses strcasecmp, strtod and errno but doesn’t include their headers. Some platforms won’t get these via transitive includes, causing build failures.

Apply:

 #include <fluent-bit/flb_mp_chunk.h>
 
+#include <strings.h>  /* strcasecmp */
+#include <stdlib.h>   /* strtod */
+#include <errno.h>    /* errno, ERANGE */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_conditionals.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_conditionals.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>
#include <strings.h> /* strcasecmp */
#include <stdlib.h> /* strtod */
#include <errno.h> /* errno, ERANGE */
🤖 Prompt for AI Agents
In src/flb_router_condition.c around lines 20 to 27, the file uses strcasecmp,
strtod and errno but lacks their standard headers; add the missing includes at
the top with the other headers: include <string.h> for strcasecmp, <stdlib.h>
for strtod, and <errno.h> for errno so builds on platforms without transitive
includes succeed.

#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1
#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1

static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition);
static void route_condition_record_destroy(struct flb_mp_chunk_record *record);

uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
{
Expand All @@ -44,18 +54,68 @@ uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
struct flb_route *route)
{
(void) chunk;
(void) route;
int ret;
int result = FLB_FALSE;
struct flb_route_condition *condition;
struct flb_condition *compiled;
struct flb_log_event_decoder decoder;
struct flb_log_event event;
struct flb_mp_chunk_record record;

/*
* The full condition evaluation engine requires field resolvers that map
* record accessors to the different telemetry payload shapes. The wiring
* of those resolvers is part of a bigger effort and will be implemented in
* follow-up changes. For the time being we simply report that the
* condition failed so that the runtime can rely on explicit default
* routes.
*/
return FLB_FALSE;
if (!chunk || !route || !route->condition) {
return FLB_FALSE;
}

if (!chunk->data || chunk->size == 0) {
return FLB_FALSE;
}

condition = route->condition;

compiled = route_condition_get_compiled(condition);
if (!compiled) {
return FLB_FALSE;
}

ret = flb_log_event_decoder_init(&decoder, chunk->data, chunk->size);
if (ret != FLB_EVENT_DECODER_SUCCESS) {
return FLB_FALSE;
}

flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid reading group meta records; skip them and check the return

Setting read_groups = TRUE surfaces group open/close meta records which can be (incorrectly) evaluated by conditions. Decoder already propagates group context to normal records without needing read_groups = TRUE.

- flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);
+ if (flb_log_event_decoder_read_groups(&decoder, FLB_FALSE) != 0) {
+     flb_log_event_decoder_destroy(&decoder);
+     return FLB_FALSE;
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);
if (flb_log_event_decoder_read_groups(&decoder, FLB_FALSE) != 0) {
flb_log_event_decoder_destroy(&decoder);
return FLB_FALSE;
}
🤖 Prompt for AI Agents
In src/flb_router_condition.c around lines 85-86, the call to
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE) causes group open/close
meta records to be surfaced and wrongly evaluated; change this to disable
reading group meta records (use FLB_FALSE / remove the read_groups enable) so
the decoder only propagates group context to normal records, and check the
function's return value for errors—handle a non-success return by logging an
error and returning/propagating failure appropriately.

while ((ret = flb_log_event_decoder_next(&decoder, &event)) == FLB_EVENT_DECODER_SUCCESS) {
memset(&record, 0, sizeof(record));
record.event = event;

if (event.metadata) {
record.cobj_metadata = flb_mp_object_to_cfl(event.metadata);
if (!record.cobj_metadata) {
route_condition_record_destroy(&record);
break;
}
}

if (event.body) {
record.cobj_record = flb_mp_object_to_cfl(event.body);
if (!record.cobj_record) {
route_condition_record_destroy(&record);
break;
}
}

if (flb_condition_evaluate(compiled, &record) == FLB_TRUE) {
result = FLB_TRUE;
route_condition_record_destroy(&record);
break;
}

route_condition_record_destroy(&record);
}

flb_log_event_decoder_destroy(&decoder);

return result;
}

int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
Expand Down Expand Up @@ -94,19 +154,14 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk,
return FLB_FALSE;
}

if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) &&
((route->signals & signal) == 0)) {
if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) {
return FLB_FALSE;
}

if (route->condition->is_default) {
return FLB_TRUE;
}

if (cfl_list_is_empty(&route->condition->rules) == 0) {
return FLB_TRUE;
}

switch (signal) {
case FLB_ROUTER_SIGNAL_LOGS:
return flb_condition_eval_logs(chunk, route);
Expand All @@ -121,3 +176,201 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk,
return FLB_FALSE;
}

int flb_router_path_should_route(struct flb_event_chunk *chunk,
struct flb_router_path *path)
{
if (!path) {
return FLB_FALSE;
}

if (!path->route) {
return FLB_TRUE;
}

return flb_route_condition_eval(chunk, path->route);
}

static int parse_rule_operator(const flb_sds_t op_str,
enum flb_rule_operator *out)
{
if (!op_str || !out) {
return -1;
}

if (strcasecmp(op_str, "eq") == 0) {
*out = FLB_RULE_OP_EQ;
}
else if (strcasecmp(op_str, "neq") == 0) {
*out = FLB_RULE_OP_NEQ;
}
else if (strcasecmp(op_str, "gt") == 0) {
*out = FLB_RULE_OP_GT;
}
else if (strcasecmp(op_str, "lt") == 0) {
*out = FLB_RULE_OP_LT;
}
else if (strcasecmp(op_str, "gte") == 0) {
*out = FLB_RULE_OP_GTE;
}
else if (strcasecmp(op_str, "lte") == 0) {
*out = FLB_RULE_OP_LTE;
}
else if (strcasecmp(op_str, "regex") == 0) {
*out = FLB_RULE_OP_REGEX;
}
else if (strcasecmp(op_str, "not_regex") == 0) {
*out = FLB_RULE_OP_NOT_REGEX;
}
else if (strcasecmp(op_str, "in") == 0) {
*out = FLB_RULE_OP_IN;
}
else if (strcasecmp(op_str, "not_in") == 0) {
*out = FLB_RULE_OP_NOT_IN;
}
else {
return -1;
}

return 0;
}

static int parse_numeric_value(flb_sds_t value, double *out)
{
char *endptr = NULL;
double result;

if (!value || !out) {
return -1;
}

errno = 0;
result = strtod(value, &endptr);
if (errno == ERANGE || endptr == value || (endptr && *endptr != '\0')) {
return -1;
}

*out = result;
return 0;
}

static struct flb_condition *route_condition_compile(struct flb_route_condition *condition)
{
int ret;
double numeric_value;
enum flb_rule_operator op;
struct cfl_list *head;
struct flb_condition *compiled;
struct flb_route_condition_rule *rule;

compiled = flb_condition_create(condition->op);
if (!compiled) {
return NULL;
}

cfl_list_foreach(head, &condition->rules) {
rule = cfl_list_entry(head, struct flb_route_condition_rule, _head);

if (!rule->field || !rule->op) {
flb_condition_destroy(compiled);
return NULL;
}

if (parse_rule_operator(rule->op, &op) != 0) {
flb_condition_destroy(compiled);
return NULL;
}

switch (op) {
case FLB_RULE_OP_EQ:
case FLB_RULE_OP_NEQ:
case FLB_RULE_OP_REGEX:
case FLB_RULE_OP_NOT_REGEX:
if (!rule->value) {
flb_condition_destroy(compiled);
return NULL;
}
ret = flb_condition_add_rule(compiled, rule->field, op,
rule->value, 1, RECORD_CONTEXT_BODY);
break;
case FLB_RULE_OP_GT:
case FLB_RULE_OP_LT:
case FLB_RULE_OP_GTE:
case FLB_RULE_OP_LTE:
if (!rule->value) {
flb_condition_destroy(compiled);
return NULL;
}
if (parse_numeric_value(rule->value, &numeric_value) != 0) {
flb_condition_destroy(compiled);
return NULL;
}
ret = flb_condition_add_rule(compiled, rule->field, op,
&numeric_value, 1, RECORD_CONTEXT_BODY);
break;
case FLB_RULE_OP_IN:
case FLB_RULE_OP_NOT_IN:
if (!rule->values || rule->values_count == 0) {
flb_condition_destroy(compiled);
return NULL;
}
ret = flb_condition_add_rule(compiled, rule->field, op,
rule->values,
(int) rule->values_count,
RECORD_CONTEXT_BODY);
break;
default:
flb_condition_destroy(compiled);
return NULL;
}

if (ret != FLB_TRUE) {
flb_condition_destroy(compiled);
return NULL;
}
}

return compiled;
}

static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition)
{
if (!condition) {
return NULL;
}

if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_FAILURE) {
return NULL;
}

if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_SUCCESS &&
condition->compiled) {
return condition->compiled;
}

condition->compiled = route_condition_compile(condition);
if (!condition->compiled) {
condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE;
return NULL;
}

condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS;
return condition->compiled;
}

static void route_condition_record_destroy(struct flb_mp_chunk_record *record)
{
if (!record) {
return;
}

if (record->cobj_record) {
cfl_object_destroy(record->cobj_record);
record->cobj_record = NULL;
}

if (record->cobj_metadata) {
cfl_object_destroy(record->cobj_metadata);
record->cobj_metadata = NULL;
}
}

Loading
Loading