Skip to content

Commit f9c1fb7

Browse files
committed
input_log: implement conditional routing with non-conditional route preservation
- Add input_chunk_remove_conditional_routes() to prevent duplicate routing - Preserve non-conditional routes when conditional routing is used - Fix base tag length calculation for proper chunk identification - Ensure conditional routing is additive rather than exclusive Signed-off-by: Eduardo Silva <[email protected]>
1 parent 32ebcda commit f9c1fb7

File tree

1 file changed

+123
-11
lines changed

1 file changed

+123
-11
lines changed

src/flb_input_log.c

Lines changed: 123 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
#include "fluent-bit/flb_pack.h"
2121
#include <fluent-bit/flb_info.h>
22+
#include <fluent-bit/flb_log.h>
2223
#include <fluent-bit/flb_input.h>
2324
#include <fluent-bit/flb_input_chunk.h>
2425
#include <fluent-bit/flb_hash_table.h>
2526
#include <fluent-bit/flb_input_log.h>
2627
#include <fluent-bit/flb_input_plugin.h>
28+
#include <fluent-bit/flb_output.h>
2729
#include <fluent-bit/flb_processor.h>
2830
#include <fluent-bit/flb_router.h>
2931
#include <fluent-bit/flb_routes_mask.h>
@@ -36,6 +38,8 @@
3638
#include <fluent-bit/flb_sds.h>
3739
#include <fluent-bit/flb_mem.h>
3840

41+
#include <chunkio/chunkio.h>
42+
3943
#include <stdint.h>
4044
#include <string.h>
4145

@@ -97,6 +101,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins,
97101
int ret;
98102
int routes_found = 0;
99103
size_t out_size = 0;
104+
ssize_t chunk_size;
100105
struct cfl_list *head;
101106
struct flb_input_chunk *chunk = NULL;
102107
struct flb_router_path *route_path;
@@ -115,14 +120,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins,
115120
(void **) &chunk,
116121
&out_size);
117122
if (ret == -1 || !chunk || !chunk->routes_mask) {
118-
/* For threaded inputs, chunk may not exist yet - this is expected */
119-
if (flb_input_is_threaded(ins)) {
120-
/* In threaded mode, routing will be handled when chunk is materialized */
121-
flb_plg_debug(ins, "chunk not yet materialized for threaded input, "
122-
"routing will be handled asynchronously");
123-
return 0; /* Success - don't treat as error */
124-
}
125-
return -1; /* Error for non-threaded inputs */
123+
return -1;
126124
}
127125

128126
memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size);
@@ -141,6 +139,14 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins,
141139
return -1;
142140
}
143141

142+
if (chunk->fs_counted == FLB_FALSE) {
143+
chunk_size = flb_input_chunk_get_real_size(chunk);
144+
if (chunk_size > 0) {
145+
flb_input_chunk_update_output_instances(chunk,
146+
(size_t) chunk_size);
147+
}
148+
}
149+
144150
return 0;
145151
}
146152

@@ -432,6 +438,65 @@ static void route_payload_list_destroy(struct cfl_list *payloads)
432438
}
433439
}
434440

441+
static void input_chunk_remove_conditional_routes(struct flb_input_instance *ins,
442+
struct flb_input_chunk *chunk)
443+
{
444+
ssize_t chunk_size;
445+
size_t chunk_size_sz;
446+
struct cfl_list *head;
447+
struct flb_router_path *route_path;
448+
449+
if (!ins || !chunk || !chunk->routes_mask || !ins->config) {
450+
return;
451+
}
452+
453+
chunk_size = -1;
454+
cfl_list_foreach(head, &ins->routes_direct) {
455+
route_path = cfl_list_entry(head, struct flb_router_path, _head);
456+
457+
if (!route_path->route || !route_path->ins) {
458+
continue;
459+
}
460+
461+
if (!route_path->route->condition &&
462+
!route_path->route->per_record_routing) {
463+
continue;
464+
}
465+
466+
if (flb_routes_mask_get_bit(chunk->routes_mask,
467+
route_path->ins->id,
468+
ins->config) == 0) {
469+
continue;
470+
}
471+
472+
flb_routes_mask_clear_bit(chunk->routes_mask,
473+
route_path->ins->id,
474+
ins->config);
475+
476+
if (route_path->ins->total_limit_size == -1 ||
477+
chunk->fs_counted == FLB_FALSE) {
478+
continue;
479+
}
480+
481+
if (chunk_size == -1) {
482+
chunk_size = flb_input_chunk_get_real_size(chunk);
483+
if (chunk_size <= 0) {
484+
chunk_size = 0;
485+
}
486+
}
487+
488+
if (chunk_size > 0) {
489+
chunk_size_sz = (size_t) chunk_size;
490+
if (route_path->ins->fs_chunks_size > chunk_size_sz) {
491+
route_path->ins->fs_chunks_size -= chunk_size_sz;
492+
}
493+
else {
494+
route_path->ins->fs_chunks_size = 0;
495+
}
496+
}
497+
}
498+
}
499+
435500
static int input_has_conditional_routes(struct flb_input_instance *ins)
436501
{
437502
struct cfl_list *head;
@@ -493,6 +558,13 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
493558
return 0;
494559
}
495560

561+
/* Conditional routing not supported for threaded inputs */
562+
if (flb_input_is_threaded(ins)) {
563+
flb_plg_warn(ins, "conditional routing not supported for threaded inputs, "
564+
"falling back to normal routing");
565+
return 0;
566+
}
567+
496568
cfl_list_init(&payloads);
497569
cfl_list_foreach(head, &ins->routes_direct) {
498570
route_path = cfl_list_entry(head, struct flb_router_path, _head);
@@ -539,6 +611,9 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
539611
base_tag_len = strlen(ins->name);
540612
}
541613
}
614+
else if (base_tag_len == 0) {
615+
base_tag_len = strlen(base_tag);
616+
}
542617

543618
chunk = flb_event_chunk_create(FLB_EVENT_TYPE_LOGS,
544619
records,
@@ -717,9 +792,15 @@ static int input_log_append(struct flb_input_instance *ins,
717792
const void *buf, size_t buf_size)
718793
{
719794
int ret;
795+
int conditional_result;
796+
int conditional_handled = FLB_FALSE;
720797
int processor_is_active;
721798
void *out_buf = (void *) buf;
799+
size_t dummy = 0;
722800
size_t out_size = buf_size;
801+
const char *base_tag = tag;
802+
size_t base_tag_len = tag_len;
803+
struct flb_input_chunk *chunk = NULL;
723804

724805
processor_is_active = flb_processor_is_active(ins->processor);
725806
if (processor_is_active) {
@@ -754,15 +835,33 @@ static int input_log_append(struct flb_input_instance *ins,
754835
}
755836
}
756837

757-
ret = split_and_append_route_payloads(ins, records, tag, tag_len,
758-
out_buf, out_size);
759-
if (ret < 0) {
838+
if (!base_tag) {
839+
if (ins->tag && ins->tag_len > 0) {
840+
base_tag = ins->tag;
841+
base_tag_len = ins->tag_len;
842+
}
843+
else {
844+
base_tag = ins->name;
845+
base_tag_len = strlen(ins->name);
846+
}
847+
}
848+
else if (base_tag_len == 0) {
849+
base_tag_len = strlen(base_tag);
850+
}
851+
852+
conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len,
853+
out_buf, out_size);
854+
if (conditional_result < 0) {
760855
if (processor_is_active && buf != out_buf) {
761856
flb_free(out_buf);
762857
}
763858
return -1;
764859
}
765860

861+
if (conditional_result > 0) {
862+
conditional_handled = FLB_TRUE;
863+
}
864+
766865
/*
767866
* Always call flb_input_chunk_append_raw to ensure non-conditional routes
768867
* receive data even when conditional routes exist. The conditional routing
@@ -771,6 +870,19 @@ static int input_log_append(struct flb_input_instance *ins,
771870
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
772871
tag, tag_len, out_buf, out_size);
773872

873+
if (ret == 0 && conditional_handled == FLB_TRUE && base_tag) {
874+
chunk = NULL;
875+
dummy = 0;
876+
877+
if (flb_hash_table_get(ins->ht_log_chunks,
878+
base_tag,
879+
base_tag_len,
880+
(void **) &chunk,
881+
&dummy) >= 0 && chunk) {
882+
input_chunk_remove_conditional_routes(ins, chunk);
883+
}
884+
}
885+
774886
if (processor_is_active && buf != out_buf) {
775887
flb_free(out_buf);
776888
}

0 commit comments

Comments
 (0)