Skip to content

Commit 839a397

Browse files
committed
router_condition: add conditional logic for logs
Signed-off-by: Eduardo Silva <[email protected]>
1 parent ac42f67 commit 839a397

File tree

1 file changed

+270
-17
lines changed

1 file changed

+270
-17
lines changed

src/flb_router_condition.c

Lines changed: 270 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@
2020
#include <fluent-bit/flb_mem.h>
2121
#include <fluent-bit/flb_log.h>
2222
#include <fluent-bit/flb_router.h>
23+
#include <fluent-bit/flb_conditionals.h>
24+
#include <fluent-bit/flb_log_event_decoder.h>
25+
#include <fluent-bit/flb_mp.h>
26+
#include <fluent-bit/flb_mp_chunk.h>
27+
28+
#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1
29+
#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1
30+
31+
static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition);
32+
static void route_condition_record_destroy(struct flb_mp_chunk_record *record);
2333

2434
uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
2535
{
@@ -44,18 +54,68 @@ uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
4454
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
4555
struct flb_route *route)
4656
{
47-
(void) chunk;
48-
(void) route;
57+
int ret;
58+
int result = FLB_FALSE;
59+
struct flb_route_condition *condition;
60+
struct flb_condition *compiled;
61+
struct flb_log_event_decoder decoder;
62+
struct flb_log_event event;
63+
struct flb_mp_chunk_record record;
4964

50-
/*
51-
* The full condition evaluation engine requires field resolvers that map
52-
* record accessors to the different telemetry payload shapes. The wiring
53-
* of those resolvers is part of a bigger effort and will be implemented in
54-
* follow-up changes. For the time being we simply report that the
55-
* condition failed so that the runtime can rely on explicit default
56-
* routes.
57-
*/
58-
return FLB_FALSE;
65+
if (!chunk || !route || !route->condition) {
66+
return FLB_FALSE;
67+
}
68+
69+
if (!chunk->data || chunk->size == 0) {
70+
return FLB_FALSE;
71+
}
72+
73+
condition = route->condition;
74+
75+
compiled = route_condition_get_compiled(condition);
76+
if (!compiled) {
77+
return FLB_FALSE;
78+
}
79+
80+
ret = flb_log_event_decoder_init(&decoder, chunk->data, chunk->size);
81+
if (ret != FLB_EVENT_DECODER_SUCCESS) {
82+
return FLB_FALSE;
83+
}
84+
85+
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);
86+
87+
while ((ret = flb_log_event_decoder_next(&decoder, &event)) == FLB_EVENT_DECODER_SUCCESS) {
88+
memset(&record, 0, sizeof(record));
89+
record.event = event;
90+
91+
if (event.metadata) {
92+
record.cobj_metadata = flb_mp_object_to_cfl(event.metadata);
93+
if (!record.cobj_metadata) {
94+
route_condition_record_destroy(&record);
95+
break;
96+
}
97+
}
98+
99+
if (event.body) {
100+
record.cobj_record = flb_mp_object_to_cfl(event.body);
101+
if (!record.cobj_record) {
102+
route_condition_record_destroy(&record);
103+
break;
104+
}
105+
}
106+
107+
if (flb_condition_evaluate(compiled, &record) == FLB_TRUE) {
108+
result = FLB_TRUE;
109+
route_condition_record_destroy(&record);
110+
break;
111+
}
112+
113+
route_condition_record_destroy(&record);
114+
}
115+
116+
flb_log_event_decoder_destroy(&decoder);
117+
118+
return result;
59119
}
60120

61121
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
@@ -94,19 +154,14 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk,
94154
return FLB_FALSE;
95155
}
96156

97-
if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) &&
98-
((route->signals & signal) == 0)) {
157+
if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) {
99158
return FLB_FALSE;
100159
}
101160

102161
if (route->condition->is_default) {
103162
return FLB_TRUE;
104163
}
105164

106-
if (cfl_list_is_empty(&route->condition->rules) == 0) {
107-
return FLB_TRUE;
108-
}
109-
110165
switch (signal) {
111166
case FLB_ROUTER_SIGNAL_LOGS:
112167
return flb_condition_eval_logs(chunk, route);
@@ -121,3 +176,201 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk,
121176
return FLB_FALSE;
122177
}
123178

179+
int flb_router_path_should_route(struct flb_event_chunk *chunk,
180+
struct flb_router_path *path)
181+
{
182+
if (!path) {
183+
return FLB_FALSE;
184+
}
185+
186+
if (!path->route) {
187+
return FLB_TRUE;
188+
}
189+
190+
return flb_route_condition_eval(chunk, path->route);
191+
}
192+
193+
static int parse_rule_operator(const flb_sds_t op_str,
194+
enum flb_rule_operator *out)
195+
{
196+
if (!op_str || !out) {
197+
return -1;
198+
}
199+
200+
if (strcasecmp(op_str, "eq") == 0) {
201+
*out = FLB_RULE_OP_EQ;
202+
}
203+
else if (strcasecmp(op_str, "neq") == 0) {
204+
*out = FLB_RULE_OP_NEQ;
205+
}
206+
else if (strcasecmp(op_str, "gt") == 0) {
207+
*out = FLB_RULE_OP_GT;
208+
}
209+
else if (strcasecmp(op_str, "lt") == 0) {
210+
*out = FLB_RULE_OP_LT;
211+
}
212+
else if (strcasecmp(op_str, "gte") == 0) {
213+
*out = FLB_RULE_OP_GTE;
214+
}
215+
else if (strcasecmp(op_str, "lte") == 0) {
216+
*out = FLB_RULE_OP_LTE;
217+
}
218+
else if (strcasecmp(op_str, "regex") == 0) {
219+
*out = FLB_RULE_OP_REGEX;
220+
}
221+
else if (strcasecmp(op_str, "not_regex") == 0) {
222+
*out = FLB_RULE_OP_NOT_REGEX;
223+
}
224+
else if (strcasecmp(op_str, "in") == 0) {
225+
*out = FLB_RULE_OP_IN;
226+
}
227+
else if (strcasecmp(op_str, "not_in") == 0) {
228+
*out = FLB_RULE_OP_NOT_IN;
229+
}
230+
else {
231+
return -1;
232+
}
233+
234+
return 0;
235+
}
236+
237+
static int parse_numeric_value(flb_sds_t value, double *out)
238+
{
239+
char *endptr = NULL;
240+
double result;
241+
242+
if (!value || !out) {
243+
return -1;
244+
}
245+
246+
errno = 0;
247+
result = strtod(value, &endptr);
248+
if (errno == ERANGE || endptr == value || (endptr && *endptr != '\0')) {
249+
return -1;
250+
}
251+
252+
*out = result;
253+
return 0;
254+
}
255+
256+
static struct flb_condition *route_condition_compile(struct flb_route_condition *condition)
257+
{
258+
int ret;
259+
double numeric_value;
260+
enum flb_rule_operator op;
261+
struct cfl_list *head;
262+
struct flb_condition *compiled;
263+
struct flb_route_condition_rule *rule;
264+
265+
compiled = flb_condition_create(condition->op);
266+
if (!compiled) {
267+
return NULL;
268+
}
269+
270+
cfl_list_foreach(head, &condition->rules) {
271+
rule = cfl_list_entry(head, struct flb_route_condition_rule, _head);
272+
273+
if (!rule->field || !rule->op) {
274+
flb_condition_destroy(compiled);
275+
return NULL;
276+
}
277+
278+
if (parse_rule_operator(rule->op, &op) != 0) {
279+
flb_condition_destroy(compiled);
280+
return NULL;
281+
}
282+
283+
switch (op) {
284+
case FLB_RULE_OP_EQ:
285+
case FLB_RULE_OP_NEQ:
286+
case FLB_RULE_OP_REGEX:
287+
case FLB_RULE_OP_NOT_REGEX:
288+
if (!rule->value) {
289+
flb_condition_destroy(compiled);
290+
return NULL;
291+
}
292+
ret = flb_condition_add_rule(compiled, rule->field, op,
293+
rule->value, 1, RECORD_CONTEXT_BODY);
294+
break;
295+
case FLB_RULE_OP_GT:
296+
case FLB_RULE_OP_LT:
297+
case FLB_RULE_OP_GTE:
298+
case FLB_RULE_OP_LTE:
299+
if (!rule->value) {
300+
flb_condition_destroy(compiled);
301+
return NULL;
302+
}
303+
if (parse_numeric_value(rule->value, &numeric_value) != 0) {
304+
flb_condition_destroy(compiled);
305+
return NULL;
306+
}
307+
ret = flb_condition_add_rule(compiled, rule->field, op,
308+
&numeric_value, 1, RECORD_CONTEXT_BODY);
309+
break;
310+
case FLB_RULE_OP_IN:
311+
case FLB_RULE_OP_NOT_IN:
312+
if (!rule->values || rule->values_count == 0) {
313+
flb_condition_destroy(compiled);
314+
return NULL;
315+
}
316+
ret = flb_condition_add_rule(compiled, rule->field, op,
317+
rule->values,
318+
(int) rule->values_count,
319+
RECORD_CONTEXT_BODY);
320+
break;
321+
default:
322+
flb_condition_destroy(compiled);
323+
return NULL;
324+
}
325+
326+
if (ret != FLB_TRUE) {
327+
flb_condition_destroy(compiled);
328+
return NULL;
329+
}
330+
}
331+
332+
return compiled;
333+
}
334+
335+
static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition)
336+
{
337+
if (!condition) {
338+
return NULL;
339+
}
340+
341+
if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_FAILURE) {
342+
return NULL;
343+
}
344+
345+
if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_SUCCESS &&
346+
condition->compiled) {
347+
return condition->compiled;
348+
}
349+
350+
condition->compiled = route_condition_compile(condition);
351+
if (!condition->compiled) {
352+
condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE;
353+
return NULL;
354+
}
355+
356+
condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS;
357+
return condition->compiled;
358+
}
359+
360+
static void route_condition_record_destroy(struct flb_mp_chunk_record *record)
361+
{
362+
if (!record) {
363+
return;
364+
}
365+
366+
if (record->cobj_record) {
367+
cfl_object_destroy(record->cobj_record);
368+
record->cobj_record = NULL;
369+
}
370+
371+
if (record->cobj_metadata) {
372+
cfl_object_destroy(record->cobj_metadata);
373+
record->cobj_metadata = NULL;
374+
}
375+
}
376+

0 commit comments

Comments
 (0)