Skip to content

Commit 32d119a

Browse files
committed
router: add context support for conditional routing
This patch adds context variable access support to router condition evaluation and configuration. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 439b5d4 commit 32d119a

File tree

3 files changed

+200
-4
lines changed

3 files changed

+200
-4
lines changed

include/fluent-bit/flb_router.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ struct flb_route_condition_rule {
8888
flb_sds_t value;
8989
flb_sds_t *values;
9090
size_t values_count;
91+
enum record_context_type context;
9192
struct cfl_list _head;
9293
};
9394

src/flb_router_condition.c

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,121 @@
2424
#include <fluent-bit/flb_log_event_encoder.h>
2525
#include <fluent-bit/flb_log_event_decoder.h>
2626
#include <fluent-bit/flb_mp_chunk.h>
27+
#include <cfl/cfl_kvlist.h>
2728

2829
#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1
2930
#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1
3031

3132
static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition);
3233

34+
static inline struct cfl_variant *get_object_variant(struct cfl_object *object)
35+
{
36+
if (!object) {
37+
return NULL;
38+
}
39+
40+
return object->variant;
41+
}
42+
43+
static inline struct cfl_variant *get_body_variant(struct flb_mp_chunk_record *record)
44+
{
45+
if (!record || !record->cobj_record) {
46+
return NULL;
47+
}
48+
49+
return record->cobj_record->variant;
50+
}
51+
52+
static struct cfl_variant *get_otel_container_variant(struct flb_mp_chunk_record *record,
53+
const char *key)
54+
{
55+
struct cfl_variant *body;
56+
struct cfl_variant *container;
57+
58+
body = get_body_variant(record);
59+
if (!body || body->type != CFL_VARIANT_KVLIST) {
60+
return NULL;
61+
}
62+
63+
container = cfl_kvlist_fetch(body->data.as_kvlist, key);
64+
if (!container || container->type != CFL_VARIANT_KVLIST) {
65+
return NULL;
66+
}
67+
68+
return container;
69+
}
70+
71+
static struct cfl_variant *get_otel_attributes_variant(struct flb_mp_chunk_record *record,
72+
enum record_context_type context_type)
73+
{
74+
struct cfl_variant *container;
75+
const char *container_key = NULL;
76+
77+
if (context_type == RECORD_CONTEXT_OTEL_RESOURCE_ATTRIBUTES) {
78+
container_key = "resource";
79+
}
80+
else if (context_type == RECORD_CONTEXT_OTEL_SCOPE_ATTRIBUTES) {
81+
container_key = "scope";
82+
}
83+
else {
84+
return NULL;
85+
}
86+
87+
container = get_otel_container_variant(record, container_key);
88+
if (!container) {
89+
return NULL;
90+
}
91+
92+
container = cfl_kvlist_fetch(container->data.as_kvlist, "attributes");
93+
if (!container || container->type != CFL_VARIANT_KVLIST) {
94+
return NULL;
95+
}
96+
97+
return container;
98+
}
99+
100+
static struct cfl_variant *get_otel_scope_metadata_variant(struct flb_mp_chunk_record *record)
101+
{
102+
struct cfl_variant *scope;
103+
104+
scope = get_otel_container_variant(record, "scope");
105+
if (!scope || scope->type != CFL_VARIANT_KVLIST) {
106+
return NULL;
107+
}
108+
109+
return scope;
110+
}
111+
112+
static struct cfl_variant *route_logs_get_variant(struct flb_condition_rule *rule,
113+
void *ctx)
114+
{
115+
struct flb_mp_chunk_record *record = (struct flb_mp_chunk_record *) ctx;
116+
117+
if (!rule || !record) {
118+
return NULL;
119+
}
120+
121+
switch (rule->context) {
122+
case RECORD_CONTEXT_METADATA:
123+
return get_object_variant(record->cobj_metadata);
124+
case RECORD_CONTEXT_BODY:
125+
return get_body_variant(record);
126+
case RECORD_CONTEXT_GROUP_METADATA:
127+
return get_object_variant(record->cobj_group_metadata);
128+
case RECORD_CONTEXT_GROUP_ATTRIBUTES:
129+
return get_object_variant(record->cobj_group_attributes);
130+
case RECORD_CONTEXT_OTEL_RESOURCE_ATTRIBUTES:
131+
case RECORD_CONTEXT_OTEL_SCOPE_ATTRIBUTES:
132+
return get_otel_attributes_variant(record, rule->context);
133+
case RECORD_CONTEXT_OTEL_SCOPE_METADATA:
134+
return get_otel_scope_metadata_variant(record);
135+
default:
136+
break;
137+
}
138+
139+
return NULL;
140+
}
141+
33142
int flb_router_chunk_context_init(struct flb_router_chunk_context *context)
34143
{
35144
if (!context) {
@@ -185,7 +294,7 @@ int flb_condition_eval_logs(struct flb_event_chunk *chunk,
185294
cfl_list_foreach(head, &context->chunk_cobj->records) {
186295
record = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
187296

188-
if (flb_condition_evaluate(compiled, record) == FLB_TRUE) {
297+
if (flb_condition_evaluate_ex(compiled, record, route_logs_get_variant) == FLB_TRUE) {
189298
result = FLB_TRUE;
190299
break;
191300
}
@@ -391,7 +500,7 @@ static struct flb_condition *route_condition_compile(struct flb_route_condition
391500
return NULL;
392501
}
393502
ret = flb_condition_add_rule(compiled, rule->field, op,
394-
rule->value, 1, RECORD_CONTEXT_BODY);
503+
rule->value, 1, rule->context);
395504
break;
396505
case FLB_RULE_OP_GT:
397506
case FLB_RULE_OP_LT:
@@ -406,7 +515,7 @@ static struct flb_condition *route_condition_compile(struct flb_route_condition
406515
return NULL;
407516
}
408517
ret = flb_condition_add_rule(compiled, rule->field, op,
409-
&numeric_value, 1, RECORD_CONTEXT_BODY);
518+
&numeric_value, 1, rule->context);
410519
break;
411520
case FLB_RULE_OP_IN:
412521
case FLB_RULE_OP_NOT_IN:
@@ -417,7 +526,7 @@ static struct flb_condition *route_condition_compile(struct flb_route_condition
417526
ret = flb_condition_add_rule(compiled, rule->field, op,
418527
rule->values,
419528
(int) rule->values_count,
420-
RECORD_CONTEXT_BODY);
529+
rule->context);
421530
break;
422531
default:
423532
flb_condition_destroy(compiled);

src/flb_router_config.c

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,13 +515,72 @@ static int parse_processors(struct cfl_variant *variant,
515515
return 0;
516516
}
517517

518+
static int parse_condition_rule_context(const char *value,
519+
enum record_context_type *out_context)
520+
{
521+
if (!out_context) {
522+
return -1;
523+
}
524+
525+
if (!value) {
526+
*out_context = RECORD_CONTEXT_BODY;
527+
return 0;
528+
}
529+
530+
if (strcasecmp(value, "metadata") == 0 ||
531+
strcasecmp(value, "record_metadata") == 0 ||
532+
strcasecmp(value, "attributes") == 0) {
533+
*out_context = RECORD_CONTEXT_METADATA;
534+
return 0;
535+
}
536+
537+
if (strcasecmp(value, "body") == 0 ||
538+
strcasecmp(value, "record") == 0 ||
539+
strcasecmp(value, "message") == 0 ||
540+
strcasecmp(value, "record_body") == 0) {
541+
*out_context = RECORD_CONTEXT_BODY;
542+
return 0;
543+
}
544+
545+
if (strcasecmp(value, "group_metadata") == 0) {
546+
*out_context = RECORD_CONTEXT_GROUP_METADATA;
547+
return 0;
548+
}
549+
550+
if (strcasecmp(value, "group_attributes") == 0 ||
551+
strcasecmp(value, "group_body") == 0) {
552+
*out_context = RECORD_CONTEXT_GROUP_ATTRIBUTES;
553+
return 0;
554+
}
555+
556+
if (strcasecmp(value, "otel_resource_attributes") == 0) {
557+
*out_context = RECORD_CONTEXT_OTEL_RESOURCE_ATTRIBUTES;
558+
return 0;
559+
}
560+
561+
if (strcasecmp(value, "otel_scope_attributes") == 0) {
562+
*out_context = RECORD_CONTEXT_OTEL_SCOPE_ATTRIBUTES;
563+
return 0;
564+
}
565+
566+
if (strcasecmp(value, "otel_scope_name") == 0 ||
567+
strcasecmp(value, "otel_scope_version") == 0 ||
568+
strcasecmp(value, "otel_scope_metadata") == 0) {
569+
*out_context = RECORD_CONTEXT_OTEL_SCOPE_METADATA;
570+
return 0;
571+
}
572+
573+
return -1;
574+
}
575+
518576
static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant *variant)
519577
{
520578
struct flb_route_condition_rule *rule;
521579
struct cfl_kvlist *kvlist;
522580
struct cfl_variant *field_var;
523581
struct cfl_variant *op_var;
524582
struct cfl_variant *value_var;
583+
struct cfl_variant *context_var;
525584

526585
if (!variant || variant->type != CFL_VARIANT_KVLIST) {
527586
return NULL;
@@ -547,6 +606,7 @@ static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant
547606
cfl_list_init(&rule->_head);
548607
rule->values = NULL;
549608
rule->values_count = 0;
609+
rule->context = RECORD_CONTEXT_BODY;
550610

551611
rule->field = copy_from_cfl_sds(field_var->data.as_string);
552612
if (!rule->field) {
@@ -618,6 +678,32 @@ static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant
618678
}
619679
}
620680

681+
context_var = cfl_kvlist_fetch(kvlist, "context");
682+
if (context_var) {
683+
if (context_var->type != CFL_VARIANT_STRING ||
684+
parse_condition_rule_context(context_var->data.as_string, &rule->context) != 0) {
685+
size_t j;
686+
687+
if (rule->values) {
688+
for (j = 0; j < rule->values_count; j++) {
689+
if (rule->values[j]) {
690+
flb_sds_destroy(rule->values[j]);
691+
}
692+
}
693+
flb_free(rule->values);
694+
}
695+
696+
if (rule->value) {
697+
flb_sds_destroy(rule->value);
698+
}
699+
700+
flb_sds_destroy(rule->op);
701+
flb_sds_destroy(rule->field);
702+
flb_free(rule);
703+
return NULL;
704+
}
705+
}
706+
621707
return rule;
622708
}
623709

0 commit comments

Comments
 (0)