Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ab8e04a
conditionals: add context support for routing conditions
edsiper Oct 31, 2025
7eaae5b
router: add context support for conditional routing
edsiper Oct 31, 2025
f2756b9
tests: internal: router_config: add context routing tests
edsiper Oct 31, 2025
c035a56
router: implement context variable resolution
edsiper Oct 31, 2025
9483559
input_log: add context extraction from log records
edsiper Oct 31, 2025
8bf492f
log_event_decoder: add context extraction helpers
edsiper Oct 31, 2025
91469f2
task: pass context to router condition evaluation
edsiper Oct 31, 2025
688e67d
processor_content_modifier: skip GROUP_START markers during metadata/…
edsiper Oct 31, 2025
7c9842a
input_log: improve context extraction and routing
edsiper Oct 31, 2025
3d46ee6
lib: add context support API for routing
edsiper Oct 31, 2025
c0581f4
tests: runtime: in_opentelemetry_routing: add context tests
edsiper Oct 31, 2025
6d52998
input_log: fix context handling edge cases
edsiper Oct 31, 2025
a9b8f92
tests: internal: router_config: fix test assertions
edsiper Oct 31, 2025
aaf61d1
tests: internal: conditional_routing: add context tests
edsiper Oct 31, 2025
c9858a7
tests: data: add context routing configuration examples
edsiper Oct 31, 2025
1ba8487
tests: runtime: in_opentelemetry_routing: enhance tests
edsiper Oct 31, 2025
6f65037
lib: use flb_compat.h instead of unistd.h (Windows issue)
edsiper Nov 1, 2025
6928119
input_log: fix variable declaration
edsiper Nov 1, 2025
9c596c5
conditionals: fix NULL record handling in flb_condition_evaluate
edsiper Nov 1, 2025
a320160
tests: internal: conditionals: NULL record should fail condition
edsiper Nov 1, 2025
1ac9fdd
log_event_decoder: add tests for group marker skipping with corrupted…
edsiper Nov 1, 2025
4d7e494
tests: internal: log_event_decoder: add comprehensive tests for group…
edsiper Nov 1, 2025
e8d2444
log_event_decoder: improve robustness for invalid group markers
edsiper Nov 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions include/fluent-bit/flb_conditionals.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,20 @@
#include <monkey/mk_core.h>
#include <fluent-bit/flb_mp_chunk.h>

struct flb_condition_rule;

typedef struct cfl_variant *(*flb_condition_get_variant_fn)(struct flb_condition_rule *rule,
void *ctx);

/* Context types enum */
enum record_context_type {
RECORD_CONTEXT_BODY = 0,
RECORD_CONTEXT_METADATA = 1
RECORD_CONTEXT_BODY = 0,
RECORD_CONTEXT_METADATA = 1,
RECORD_CONTEXT_GROUP_METADATA,
RECORD_CONTEXT_GROUP_ATTRIBUTES,
RECORD_CONTEXT_OTEL_RESOURCE_ATTRIBUTES,
RECORD_CONTEXT_OTEL_SCOPE_ATTRIBUTES,
RECORD_CONTEXT_OTEL_SCOPE_METADATA
};

struct flb_condition;
Expand Down Expand Up @@ -88,6 +98,9 @@ int flb_condition_add_rule(struct flb_condition *cond,
void flb_condition_destroy(struct flb_condition *cond);

/* Evaluation function */
int flb_condition_evaluate_ex(struct flb_condition *cond,
void *ctx,
flb_condition_get_variant_fn get_variant);
int flb_condition_evaluate(struct flb_condition *cond,
struct flb_mp_chunk_record *record);

Expand Down
11 changes: 11 additions & 0 deletions include/fluent-bit/flb_log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@
#define FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V1 FLB_LOG_EVENT_FORMAT_FORWARD
#define FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2 4

/*
* Log event type identification via timestamp value:
* - Non-negative timestamps (>= 0): Normal log records with actual timestamps
* - -1 (FLB_LOG_EVENT_GROUP_START): Group marker indicating start of a log group
* - -2 (FLB_LOG_EVENT_GROUP_END): Group marker indicating end of a log group
* - Other negative values: Invalid/corrupted data (will be skipped by decoder)
*
* NOTE: Negative timestamps are RESERVED for group markers. Only -1 and -2 are valid.
* Any other negative timestamp is considered invalid and will be skipped during decoding.
* Encoders must respect this contract and only use -1/-2 for group markers.
*/
#define FLB_LOG_EVENT_NORMAL (int32_t) 0
#define FLB_LOG_EVENT_GROUP_START (int32_t) -1
#define FLB_LOG_EVENT_GROUP_END (int32_t) -2
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_log_event_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct flb_log_event_decoder {
size_t length;
int last_result;
int read_groups;
unsigned int recursion_depth; /* Safety guard for recursion limit */
};

void flb_log_event_decoder_reset(struct flb_log_event_decoder *context,
Expand Down
5 changes: 5 additions & 0 deletions include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
struct flb_mp_chunk_cobj;
struct flb_log_event_encoder;
struct flb_log_event_decoder;
struct flb_mp_chunk_record;

struct flb_router_chunk_context {
struct flb_mp_chunk_cobj *chunk_cobj;
Expand Down Expand Up @@ -88,6 +89,7 @@ struct flb_route_condition_rule {
flb_sds_t value;
flb_sds_t *values;
size_t values_count;
enum record_context_type context;
struct cfl_list _head;
};

Expand All @@ -102,6 +104,7 @@ struct flb_route_condition {
struct flb_route_output {
flb_sds_t name;
flb_sds_t fallback;
struct flb_output_instance *ins;
struct cfl_list _head;
};

Expand Down Expand Up @@ -155,6 +158,8 @@ int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *conte
int flb_route_condition_eval(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_route *route);
int flb_router_condition_evaluate_record(struct flb_route *route,
struct flb_mp_chunk_record *record);
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_route *route);
Expand Down
6 changes: 6 additions & 0 deletions plugins/processor_content_modifier/cm_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ int cm_logs_process(struct flb_processor_instance *ins,
continue;
}

if (record_type == FLB_LOG_EVENT_GROUP_START &&
(ctx->context_type == CM_CONTEXT_LOG_METADATA ||
ctx->context_type == CM_CONTEXT_LOG_BODY)) {
continue;
}

/* retrieve the target cfl object */
if (ctx->context_type == CM_CONTEXT_LOG_METADATA) {
obj = record->cobj_metadata;
Expand Down
45 changes: 34 additions & 11 deletions src/flb_conditionals.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_conditionals.h>

/* Function to get the record variant based on context */
static inline struct cfl_variant *get_record_variant(struct flb_mp_chunk_record *record,
enum record_context_type context_type)
static struct cfl_variant *default_get_record_variant(struct flb_condition_rule *rule,
void *ctx)
{
if (!record) {
struct flb_mp_chunk_record *record = (struct flb_mp_chunk_record *) ctx;

if (!record || !rule) {
return NULL;
}

switch (context_type) {
switch (rule->context) {
case RECORD_CONTEXT_METADATA:
if (record->cobj_metadata) {
return record->cobj_metadata->variant;
Expand All @@ -45,6 +46,9 @@ static inline struct cfl_variant *get_record_variant(struct flb_mp_chunk_record
return record->cobj_record->variant;
}
break;

default:
break;
}

return NULL;
Expand Down Expand Up @@ -356,8 +360,9 @@ static int evaluate_rule(struct flb_condition_rule *rule,
return result;
}

int flb_condition_evaluate(struct flb_condition *cond,
struct flb_mp_chunk_record *record)
int flb_condition_evaluate_ex(struct flb_condition *cond,
void *ctx,
flb_condition_get_variant_fn get_variant)
{
struct mk_list *head;
struct flb_condition_rule *rule;
Expand All @@ -366,11 +371,16 @@ int flb_condition_evaluate(struct flb_condition *cond,
int any_rule_evaluated = FLB_FALSE;
int any_rule_matched = FLB_FALSE;

if (!cond || !record) {
flb_trace("[condition] NULL condition or record, returning TRUE");
if (!cond) {
flb_trace("[condition] NULL condition, returning TRUE");
return FLB_TRUE;
}

if (!get_variant) {
flb_trace("[condition] missing variant provider, returning FALSE");
return FLB_FALSE;
}

flb_trace("[condition] evaluating condition with %d rules", mk_list_size(&cond->rules));

if (mk_list_size(&cond->rules) == 0) {
Expand All @@ -382,8 +392,7 @@ int flb_condition_evaluate(struct flb_condition *cond,
rule = mk_list_entry(head, struct flb_condition_rule, _head);
flb_trace("[condition] processing rule with op=%d", rule->op);

/* Get the variant for this rule's context */
record_variant = get_record_variant(record, rule->context);
record_variant = get_variant(rule, ctx);
any_rule_evaluated = FLB_TRUE;
if (!record_variant) {
flb_trace("[condition] no record variant found for context %d", rule->context);
Expand Down Expand Up @@ -427,3 +436,17 @@ int flb_condition_evaluate(struct flb_condition *cond,
flb_trace("[condition] final evaluation result: TRUE");
return FLB_TRUE;
}

int flb_condition_evaluate(struct flb_condition *cond,
struct flb_mp_chunk_record *record)
{
if (!cond) {
return FLB_TRUE;
}

if (!record) {
return FLB_FALSE;
}

return flb_condition_evaluate_ex(cond, record, default_get_record_variant);
}
Loading
Loading