diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index accde0b910b..fc01d0e9dd4 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -52,6 +52,8 @@ #include #include +#include + #include #include @@ -335,10 +337,10 @@ struct flb_input_instance { struct mk_list _head; /* link to config->inputs */ - struct mk_list routes_direct; /* direct routes set by API */ - struct mk_list routes; /* flb_router_path's list */ - struct mk_list properties; /* properties / configuration */ - struct mk_list collectors; /* collectors */ + struct cfl_list routes_direct; /* direct routes set by API */ + struct cfl_list routes; /* flb_router_path's list */ + struct mk_list properties; /* properties / configuration */ + struct mk_list collectors; /* collectors */ /* Storage Chunks */ struct mk_list chunks; /* linked list of all chunks */ diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 696e40ac4ba..b9b55abe28d 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -113,6 +113,7 @@ int flb_input_chunk_get_tag(struct flb_input_chunk *ic, void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins); void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data); ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic); +ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); size_t flb_input_chunk_set_limits(struct flb_input_instance *in); size_t flb_input_chunk_total_size(struct flb_input_instance *in); struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index a8e4f2dddab..8b34e0af160 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -26,12 +26,24 @@ #include #include #include +#include #include #include +struct flb_mp_chunk_cobj; +struct flb_log_event_encoder; +struct flb_log_event_decoder; + +struct flb_router_chunk_context { + struct flb_mp_chunk_cobj *chunk_cobj; + struct flb_log_event_encoder *log_encoder; + struct flb_log_event_decoder *log_decoder; +}; + struct flb_router_path { struct flb_output_instance *ins; - struct mk_list _head; + struct flb_route *route; + struct cfl_list _head; }; static inline int flb_router_match_type(int in_event_type, @@ -74,12 +86,17 @@ struct flb_route_condition_rule { flb_sds_t field; flb_sds_t op; flb_sds_t value; + flb_sds_t *values; + size_t values_count; struct cfl_list _head; }; struct flb_route_condition { struct cfl_list rules; int is_default; + enum flb_condition_operator op; + struct flb_condition *compiled; + int compiled_status; }; struct flb_route_output { @@ -104,6 +121,7 @@ struct flb_route { flb_sds_t name; uint32_t signals; struct flb_route_condition *condition; + int per_record_routing; struct cfl_list outputs; struct cfl_list processors; struct cfl_list _head; @@ -128,14 +146,29 @@ void flb_router_exit(struct flb_config *config); uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk); +int flb_router_chunk_context_init(struct flb_router_chunk_context *context); +void flb_router_chunk_context_reset(struct flb_router_chunk_context *context); +void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context); +int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context, + struct flb_event_chunk *chunk); + int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route); +int flb_router_path_should_route(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, + struct flb_router_path *path); + +struct flb_condition *flb_router_route_get_condition(struct flb_route *route); struct flb_cf; diff --git a/src/flb_conditionals.c b/src/flb_conditionals.c index 484931811ac..05f943be17e 100644 --- a/src/flb_conditionals.c +++ b/src/flb_conditionals.c @@ -363,6 +363,8 @@ int flb_condition_evaluate(struct flb_condition *cond, struct flb_condition_rule *rule; struct cfl_variant *record_variant; int result; + int any_rule_evaluated = FLB_FALSE; + int any_rule_matched = FLB_FALSE; if (!cond || !record) { flb_trace("[condition] NULL condition or record, returning TRUE"); @@ -382,9 +384,16 @@ int flb_condition_evaluate(struct flb_condition *cond, /* Get the variant for this rule's context */ record_variant = get_record_variant(record, rule->context); + any_rule_evaluated = FLB_TRUE; if (!record_variant) { flb_trace("[condition] no record variant found for context %d", rule->context); - continue; + if (cond->op == FLB_COND_OP_AND) { + flb_trace("[condition] AND condition missing field, returning FALSE"); + return FLB_FALSE; + } + else { + continue; + } } flb_trace("[condition] evaluating rule against record"); @@ -399,8 +408,22 @@ int flb_condition_evaluate(struct flb_condition *cond, flb_trace("[condition] OR condition with TRUE result, short-circuiting"); return FLB_TRUE; } + + if (result == FLB_TRUE) { + any_rule_matched = FLB_TRUE; + } + } + + if (cond->op == FLB_COND_OP_OR) { + flb_trace("[condition] final evaluation result: %d", any_rule_matched); + return any_rule_matched; } - flb_trace("[condition] final evaluation result: %d", (cond->op == FLB_COND_OP_AND) ? FLB_TRUE : FLB_FALSE); - return (cond->op == FLB_COND_OP_AND) ? FLB_TRUE : FLB_FALSE; -} \ No newline at end of file + if (any_rule_evaluated == FLB_FALSE) { + flb_trace("[condition] no rules evaluated, defaulting to FALSE for AND condition"); + return FLB_FALSE; + } + + flb_trace("[condition] final evaluation result: TRUE"); + return FLB_TRUE; +} diff --git a/src/flb_event.c b/src/flb_event.c index 2edf3a29643..906344e05ec 100644 --- a/src/flb_event.c +++ b/src/flb_event.c @@ -23,7 +23,6 @@ #include #include - struct flb_event_chunk *flb_event_chunk_create(int type, int total_events, char *tag_buf, int tag_len, diff --git a/src/flb_input.c b/src/flb_input.c index 1d380b6ed7b..97ebb1e4ad9 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -352,8 +352,8 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->host.ipv6 = FLB_FALSE; /* Initialize list heads */ - mk_list_init(&instance->routes_direct); - mk_list_init(&instance->routes); + cfl_list_init(&instance->routes_direct); + cfl_list_init(&instance->routes); mk_list_init(&instance->tasks); mk_list_init(&instance->chunks); mk_list_init(&instance->collectors); diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index b8f431452c4..9f1c5af4e33 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -96,7 +96,6 @@ static int flb_input_chunk_drop_task_route( struct flb_output_instance *o_ins, ssize_t *dropped_record_count); -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic); static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk) { @@ -283,7 +282,7 @@ ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic) * is used to track the size of chunks in filesystem so we need to call * cio_chunk_get_real_size to return the original size in the file system */ -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) +ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) { ssize_t meta_size; ssize_t size; diff --git a/src/flb_input_log.c b/src/flb_input_log.c index ac4b5cdfa67..ae06d079379 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -17,12 +17,798 @@ * limitations under the License. */ +#include "fluent-bit/flb_pack.h" #include +#include #include #include +#include #include #include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +struct flb_route_payload { + struct flb_route *route; + int is_default; + flb_sds_t tag; + char *data; + size_t size; + size_t total_records; + struct cfl_list _head; +}; + +static void route_payload_destroy(struct flb_route_payload *payload) +{ + if (!payload) { + return; + } + + if (!cfl_list_entry_is_orphan(&payload->_head)) { + cfl_list_del(&payload->_head); + } + + if (payload->tag) { + flb_sds_destroy(payload->tag); + } + + if (payload->data) { + flb_free(payload->data); + } + + flb_free(payload); +} + +static struct flb_route_payload *route_payload_find(struct cfl_list *payloads, + struct flb_route *route) +{ + struct cfl_list *head; + struct flb_route_payload *payload; + + if (!payloads || !route) { + return NULL; + } + + cfl_list_foreach(head, payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); + + if (payload->route == route) { + return payload; + } + } + + return NULL; +} + +static int route_payload_apply_outputs(struct flb_input_instance *ins, + struct flb_route_payload *payload) +{ + int ret; + int routes_found = 0; + size_t out_size = 0; + size_t chunk_size_sz = 0; + ssize_t chunk_size; + struct cfl_list *head; + struct flb_input_chunk *chunk = NULL; + struct flb_router_path *route_path; + + if (!ins || !payload || !payload->tag || !payload->route) { + return -1; + } + + if (!ins->ht_log_chunks || !ins->config) { + return -1; + } + + ret = flb_hash_table_get(ins->ht_log_chunks, + payload->tag, + flb_sds_len(payload->tag), + (void **) &chunk, + &out_size); + if (ret == -1 || !chunk || !chunk->routes_mask) { + return -1; + } + + if (chunk->fs_counted == FLB_TRUE) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size > 0) { + chunk_size_sz = (size_t) chunk_size; + } + else { + chunk_size = 0; + } + } + else { + chunk_size = 0; + } + + if (chunk_size_sz > 0) { + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + + if (!route_path->ins) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config) == 0) { + continue; + } + + if (route_path->route == payload->route) { + continue; + } + + if (route_path->ins->total_limit_size != -1) { + if (route_path->ins->fs_chunks_size > chunk_size_sz) { + route_path->ins->fs_chunks_size -= chunk_size_sz; + } + else { + route_path->ins->fs_chunks_size = 0; + } + } + + flb_routes_mask_clear_bit(chunk->routes_mask, + route_path->ins->id, + ins->config); + } + } + + memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size); + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (route_path->route != payload->route || !route_path->ins) { + continue; + } + flb_routes_mask_set_bit(chunk->routes_mask, + route_path->ins->id, + ins->config); + routes_found++; + } + + if (flb_routes_mask_is_empty(chunk->routes_mask, ins->config) == FLB_TRUE) { + return -1; + } + + if (chunk->fs_counted == FLB_FALSE) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size > 0) { + flb_input_chunk_update_output_instances(chunk, + (size_t) chunk_size); + } + } + + return 0; +} + +static int encode_empty_map(char **out_buf, size_t *out_size) +{ + char *buf; + + if (!out_buf || !out_size) { + return -1; + } + + buf = flb_malloc(1); + if (!buf) { + flb_errno(); + return -1; + } + + buf[0] = 0x80; + + *out_buf = buf; + *out_size = 1; + + return 0; +} + +static int encode_cfl_object_or_empty(struct cfl_object *obj, + char **out_buf, + size_t *out_size) +{ + if (!out_buf || !out_size) { + return -1; + } + + if (obj) { + return flb_mp_cfl_to_msgpack(obj, out_buf, out_size); + } + + return encode_empty_map(out_buf, out_size); +} + +static int encode_chunk_record(struct flb_log_event_encoder *encoder, + struct flb_mp_chunk_record *record) +{ + int ret; + int record_type; + char *mp_buf = NULL; + size_t mp_size = 0; + + if (!encoder || !record) { + return -1; + } + + ret = flb_log_event_encoder_begin_record(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_set_timestamp(encoder, &record->event.timestamp); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + if (record->event.timestamp.tm.tv_sec >= 0) { + record_type = FLB_LOG_EVENT_NORMAL; + } + else if (record->event.timestamp.tm.tv_sec == FLB_LOG_EVENT_GROUP_START) { + record_type = FLB_LOG_EVENT_GROUP_START; + } + else if (record->event.timestamp.tm.tv_sec == FLB_LOG_EVENT_GROUP_END) { + record_type = FLB_LOG_EVENT_GROUP_END; + } + else { + record_type = FLB_LOG_EVENT_NORMAL; + } + + ret = encode_cfl_object_or_empty(record->cobj_metadata, &mp_buf, &mp_size); + if (ret != 0) { + return -1; + } + + ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(encoder, + mp_buf, + mp_size); + flb_free(mp_buf); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + mp_buf = NULL; + mp_size = 0; + + if (record_type == FLB_LOG_EVENT_GROUP_START && + record->cobj_group_attributes) { + ret = flb_mp_cfl_to_msgpack(record->cobj_group_attributes, + &mp_buf, + &mp_size); + } + else if (record->cobj_record) { + ret = flb_mp_cfl_to_msgpack(record->cobj_record, &mp_buf, &mp_size); + } + else { + ret = encode_empty_map(&mp_buf, &mp_size); + } + + if (ret != 0) { + return -1; + } + + ret = flb_log_event_encoder_set_body_from_raw_msgpack(encoder, + mp_buf, + mp_size); + flb_free(mp_buf); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_commit_record(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + return 0; +} + +static int build_payload_for_route(struct flb_route_payload *payload, + struct flb_mp_chunk_record **records, + size_t record_count, + uint8_t *matched_non_default) +{ + size_t i; + int ret; + int condition_result; + int matched; + struct flb_condition *compiled; + struct flb_log_event_encoder *encoder; + + if (!payload || !records || record_count == 0 || !matched_non_default) { + return -1; + } + + + compiled = flb_router_route_get_condition(payload->route); + if (!compiled) { + return 0; + } + + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!encoder) { + return -1; + } + + matched = 0; + + for (i = 0; i < record_count; i++) { + condition_result = flb_condition_evaluate(compiled, records[i]); + if (condition_result != FLB_TRUE) { + continue; + } + + ret = encode_chunk_record(encoder, records[i]); + if (ret != 0) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + matched_non_default[i] = 1; + matched++; + } + + if (matched == 0) { + flb_log_event_encoder_destroy(encoder); + return 0; + } + + payload->data = flb_malloc(encoder->output_length); + if (!payload->data) { + flb_log_event_encoder_destroy(encoder); + flb_errno(); + return -1; + } + + memcpy(payload->data, encoder->output_buffer, encoder->output_length); + payload->size = encoder->output_length; + payload->total_records = matched; + + flb_log_event_encoder_destroy(encoder); + + return 0; +} + +static int build_payload_for_default_route(struct flb_route_payload *payload, + struct flb_mp_chunk_record **records, + size_t record_count, + uint8_t *matched_non_default) +{ + size_t i; + int matched; + int ret; + struct flb_condition *compiled; + struct flb_log_event_encoder *encoder; + + if (!payload || !records || !matched_non_default) { + return -1; + } + + encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!encoder) { + return -1; + } + + compiled = flb_router_route_get_condition(payload->route); + matched = 0; + + for (i = 0; i < record_count; i++) { + if (matched_non_default[i]) { + continue; + } + + if (compiled && + flb_condition_evaluate(compiled, records[i]) != FLB_TRUE) { + continue; + } + + ret = encode_chunk_record(encoder, records[i]); + if (ret != 0) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + matched++; + } + + if (matched == 0) { + flb_log_event_encoder_destroy(encoder); + return 0; + } + + payload->data = flb_malloc(encoder->output_length); + if (!payload->data) { + flb_log_event_encoder_destroy(encoder); + flb_errno(); + return -1; + } + + memcpy(payload->data, encoder->output_buffer, encoder->output_length); + payload->size = encoder->output_length; + payload->total_records = matched; + + flb_log_event_encoder_destroy(encoder); + + return 0; +} + +static void route_payload_list_destroy(struct cfl_list *payloads) +{ + struct cfl_list *head; + struct cfl_list *tmp; + struct flb_route_payload *payload; + + if (!payloads) { + return; + } + + cfl_list_foreach_safe(head, tmp, payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); + route_payload_destroy(payload); + } +} + +static void input_chunk_remove_conditional_routes(struct flb_input_instance *ins, + struct flb_input_chunk *chunk) +{ + ssize_t chunk_size; + size_t chunk_size_sz; + struct cfl_list *head; + struct flb_router_path *route_path; + + if (!ins || !chunk || !chunk->routes_mask || !ins->config) { + return; + } + + chunk_size = -1; + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + + if (!route_path->route || !route_path->ins) { + continue; + } + + if (!route_path->route->condition && + !route_path->route->per_record_routing) { + continue; + } + + if (flb_routes_mask_get_bit(chunk->routes_mask, + route_path->ins->id, + ins->config) == 0) { + continue; + } + + flb_routes_mask_clear_bit(chunk->routes_mask, + route_path->ins->id, + ins->config); + + if (route_path->ins->total_limit_size == -1 || + chunk->fs_counted == FLB_FALSE) { + continue; + } + + if (chunk_size == -1) { + chunk_size = flb_input_chunk_get_real_size(chunk); + if (chunk_size <= 0) { + chunk_size = 0; + } + } + + if (chunk_size > 0) { + chunk_size_sz = (size_t) chunk_size; + if (route_path->ins->fs_chunks_size > chunk_size_sz) { + route_path->ins->fs_chunks_size -= chunk_size_sz; + } + else { + route_path->ins->fs_chunks_size = 0; + } + } + } +} + +static int input_has_conditional_routes(struct flb_input_instance *ins) +{ + struct cfl_list *head; + struct flb_router_path *route_path; + + if (!ins) { + return FLB_FALSE; + } + + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (route_path->route && (route_path->route->condition || route_path->route->per_record_routing)) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int split_and_append_route_payloads(struct flb_input_instance *ins, + size_t records, + const char *tag, + size_t tag_len, + const void *buf, + size_t buf_size) +{ + int ret; + int appended; + int handled; + int context_initialized = FLB_FALSE; + size_t out_size = 0; + uint8_t *matched_non_default = NULL; + struct cfl_list payloads; + struct cfl_list *head; + struct cfl_list *tmp; + struct flb_router_path *route_path; + struct flb_route_payload *payload; + struct flb_router_chunk_context context; + struct flb_event_chunk *chunk; + struct flb_mp_chunk_record **records_array = NULL; + struct flb_input_chunk *orphaned_chunk = NULL; + + size_t record_count; + size_t index; + size_t base_tag_len = tag_len; + const char *base_tag = tag; + + handled = FLB_FALSE; + + if (!ins || !buf || buf_size == 0) { + return 0; + } + + if (cfl_list_size(&ins->routes_direct) == 0) { + return 0; + } + + if (input_has_conditional_routes(ins) == FLB_FALSE) { + return 0; + } + + /* Conditional routing not supported for threaded inputs */ + if (flb_input_is_threaded(ins)) { + flb_plg_warn(ins, "conditional routing not supported for threaded inputs, " + "falling back to normal routing"); + return 0; + } + + cfl_list_init(&payloads); + cfl_list_foreach(head, &ins->routes_direct) { + route_path = cfl_list_entry(head, struct flb_router_path, _head); + if (!route_path->route || + (!route_path->route->condition && !route_path->route->per_record_routing)) { + continue; + } + + payload = route_payload_find(&payloads, route_path->route); + if (!payload) { + payload = flb_calloc(1, sizeof(struct flb_route_payload)); + if (!payload) { + flb_errno(); + route_payload_list_destroy(&payloads); + return -1; + } + + payload->route = route_path->route; + payload->is_default = route_path->route->condition ? route_path->route->condition->is_default : 0; + + /* Use the route name as the tag */ + payload->tag = flb_sds_create(route_path->route->name); + if (!payload->tag) { + flb_free(payload); + route_payload_list_destroy(&payloads); + return -1; + } + cfl_list_add(&payload->_head, &payloads); + } + } + + if (cfl_list_size(&payloads) == 0) { + return 0; + } + + handled = FLB_TRUE; + if (!base_tag) { + if (ins->tag && ins->tag_len > 0) { + base_tag = ins->tag; + base_tag_len = ins->tag_len; + } + else { + base_tag = ins->name; + base_tag_len = strlen(ins->name); + } + } + else if (base_tag_len == 0) { + base_tag_len = strlen(base_tag); + } + + chunk = flb_event_chunk_create(FLB_EVENT_TYPE_LOGS, + records, + (char *) base_tag, + base_tag_len, + (char *) buf, + buf_size); + if (!chunk) { + route_payload_list_destroy(&payloads); + return -1; + } + + if (flb_router_chunk_context_init(&context) != 0) { + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + context_initialized = FLB_TRUE; + + ret = flb_router_chunk_context_prepare_logs(&context, chunk); + if (ret != 0 || !context.chunk_cobj) { + if (context_initialized) { + flb_router_chunk_context_destroy(&context); + } + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + record_count = cfl_list_size(&context.chunk_cobj->records); + if (record_count == 0) { + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return handled ? 1 : 0; + } + + records_array = flb_calloc(record_count, + sizeof(struct flb_mp_chunk_record *)); + if (!records_array) { + flb_errno(); + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + matched_non_default = flb_calloc(record_count, sizeof(uint8_t)); + if (!records_array || !matched_non_default) { + flb_errno(); + if (records_array) { + flb_free(records_array); + } + if (matched_non_default) { + flb_free(matched_non_default); + } + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + index = 0; + cfl_list_foreach(head, &context.chunk_cobj->records) { + records_array[index++] = cfl_list_entry(head, struct flb_mp_chunk_record, _head); + } + + cfl_list_foreach(head, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); + if (payload->is_default) { + continue; + } + + ret = build_payload_for_route(payload, + records_array, + record_count, + matched_non_default); + if (ret != 0) { + flb_free(records_array); + flb_free(matched_non_default); + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + } + + cfl_list_foreach(head, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); + if (!payload->is_default) { + continue; + } + + ret = build_payload_for_default_route(payload, + records_array, + record_count, + matched_non_default); + if (ret != 0) { + flb_free(records_array); + flb_free(matched_non_default); + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + } + + flb_free(records_array); + flb_free(matched_non_default); + + cfl_list_foreach_safe(head, tmp, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); + + if (payload->total_records <= 0 || !payload->data) { + route_payload_destroy(payload); + } + } + + appended = 0; + cfl_list_foreach(head, &payloads) { + payload = cfl_list_entry(head, struct flb_route_payload, _head); + + ret = flb_input_chunk_append_raw(ins, + FLB_INPUT_LOGS, + payload->total_records, + payload->tag, + flb_sds_len(payload->tag), + payload->data, + payload->size); + if (ret != 0) { + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + if (route_payload_apply_outputs(ins, payload) != 0) { + /* Clean up the orphaned chunk from ht_log_chunks */ + orphaned_chunk = NULL; + out_size = 0; + ret = flb_hash_table_get(ins->ht_log_chunks, + payload->tag, + flb_sds_len(payload->tag), + (void **) &orphaned_chunk, + &out_size); + if (ret >= 0 && orphaned_chunk) { + flb_hash_table_del_ptr(ins->ht_log_chunks, + payload->tag, + flb_sds_len(payload->tag), + (void *) orphaned_chunk); + /* Destroy the orphaned chunk completely */ + flb_input_chunk_destroy(orphaned_chunk, FLB_TRUE); + } + flb_router_chunk_context_destroy(&context); + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + return -1; + } + + appended++; + } + + if (context_initialized) { + flb_router_chunk_context_destroy(&context); + } + route_payload_list_destroy(&payloads); + flb_event_chunk_destroy(chunk); + + return handled ? (appended > 0 ? appended : 1) : 0; +} static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, @@ -31,9 +817,15 @@ static int input_log_append(struct flb_input_instance *ins, const void *buf, size_t buf_size) { int ret; + int conditional_result; + int conditional_handled = FLB_FALSE; int processor_is_active; void *out_buf = (void *) buf; + size_t dummy = 0; size_t out_size = buf_size; + const char *base_tag = tag; + size_t base_tag_len = tag_len; + struct flb_input_chunk *chunk = NULL; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -68,9 +860,53 @@ static int input_log_append(struct flb_input_instance *ins, } } + if (!base_tag) { + if (ins->tag && ins->tag_len > 0) { + base_tag = ins->tag; + base_tag_len = ins->tag_len; + } + else { + base_tag = ins->name; + base_tag_len = strlen(ins->name); + } + } + else if (base_tag_len == 0) { + base_tag_len = strlen(base_tag); + } + + conditional_result = split_and_append_route_payloads(ins, records, tag, tag_len, + out_buf, out_size); + if (conditional_result < 0) { + if (processor_is_active && buf != out_buf) { + flb_free(out_buf); + } + return -1; + } + + if (conditional_result > 0) { + conditional_handled = FLB_TRUE; + } + + /* + * Always call flb_input_chunk_append_raw to ensure non-conditional routes + * receive data even when conditional routes exist. The conditional routing + * should be additive, not exclusive. + */ ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, tag, tag_len, out_buf, out_size); + if (ret == 0 && conditional_handled == FLB_TRUE && base_tag) { + chunk = NULL; + dummy = 0; + + if (flb_hash_table_get(ins->ht_log_chunks, + base_tag, + base_tag_len, + (void **) &chunk, + &dummy) >= 0 && chunk) { + input_chunk_remove_conditional_routes(ins, chunk); + } + } if (processor_is_active && buf != out_buf) { flb_free(out_buf); diff --git a/src/flb_router.c b/src/flb_router.c index 9312f5ffed0..47251d52023 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -155,7 +155,8 @@ int flb_router_connect(struct flb_input_instance *in, } p->ins = out; - mk_list_add(&p->_head, &in->routes); + p->route = NULL; + cfl_list_add(&p->_head, &in->routes); return 0; } @@ -172,7 +173,8 @@ int flb_router_connect_direct(struct flb_input_instance *in, } p->ins = out; - mk_list_add(&p->_head, &in->routes_direct); + p->route = NULL; + cfl_list_add(&p->_head, &in->routes_direct); return 0; } @@ -271,9 +273,9 @@ int flb_router_io_set(struct flb_config *config) void flb_router_exit(struct flb_config *config) { struct mk_list *tmp; - struct mk_list *r_tmp; + struct cfl_list *r_tmp; struct mk_list *head; - struct mk_list *r_head; + struct cfl_list *r_head; struct flb_input_instance *in; struct flb_router_path *r; @@ -282,16 +284,16 @@ void flb_router_exit(struct flb_config *config) in = mk_list_entry(head, struct flb_input_instance, _head); /* Iterate instance routes */ - mk_list_foreach_safe(r_head, r_tmp, &in->routes) { - r = mk_list_entry(r_head, struct flb_router_path, _head); - mk_list_del(&r->_head); + cfl_list_foreach_safe(r_head, r_tmp, &in->routes) { + r = cfl_list_entry(r_head, struct flb_router_path, _head); + cfl_list_del(&r->_head); flb_free(r); } /* Iterate instance routes direct */ - mk_list_foreach_safe(r_head, r_tmp, &in->routes_direct) { - r = mk_list_entry(r_head, struct flb_router_path, _head); - mk_list_del(&r->_head); + cfl_list_foreach_safe(r_head, r_tmp, &in->routes_direct) { + r = cfl_list_entry(r_head, struct flb_router_path, _head); + cfl_list_del(&r->_head); flb_free(r); } } diff --git a/src/flb_router_condition.c b/src/flb_router_condition.c index 318227a094a..d24fcc67ba1 100644 --- a/src/flb_router_condition.c +++ b/src/flb_router_condition.c @@ -20,6 +20,118 @@ #include #include #include +#include +#include +#include +#include + +#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1 +#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1 + +static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition); + +int flb_router_chunk_context_init(struct flb_router_chunk_context *context) +{ + if (!context) { + return -1; + } + + context->chunk_cobj = NULL; + context->log_encoder = NULL; + context->log_decoder = NULL; + + return 0; +} + +void flb_router_chunk_context_reset(struct flb_router_chunk_context *context) +{ + if (!context) { + return; + } + + if (context->chunk_cobj) { + flb_mp_chunk_cobj_destroy(context->chunk_cobj); + context->chunk_cobj = NULL; + } + + if (context->log_decoder) { + flb_log_event_decoder_destroy(context->log_decoder); + context->log_decoder = NULL; + } + + if (context->log_encoder) { + flb_log_event_encoder_destroy(context->log_encoder); + context->log_encoder = NULL; + } +} + +void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context) +{ + flb_router_chunk_context_reset(context); +} + +int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context, + struct flb_event_chunk *chunk) +{ + int ret; + struct flb_mp_chunk_record *record; + + if (!context || !chunk) { + return -1; + } + + if (chunk->type != FLB_EVENT_TYPE_LOGS) { + return 0; + } + + if (context->chunk_cobj) { + return 0; + } + + if (!chunk->data || chunk->size == 0) { + return -1; + } + + if (!context->log_encoder) { + context->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); + if (!context->log_encoder) { + return -1; + } + } + + if (!context->log_decoder) { + context->log_decoder = flb_log_event_decoder_create(NULL, 0); + if (!context->log_decoder) { + flb_router_chunk_context_reset(context); + return -1; + } + flb_log_event_decoder_read_groups(context->log_decoder, FLB_TRUE); + } + + flb_log_event_decoder_reset(context->log_decoder, chunk->data, chunk->size); + + context->chunk_cobj = flb_mp_chunk_cobj_create(context->log_encoder, + context->log_decoder); + if (!context->chunk_cobj) { + flb_router_chunk_context_reset(context); + return -1; + } + + while ((ret = flb_mp_chunk_cobj_record_next(context->chunk_cobj, &record)) == + FLB_MP_CHUNK_RECORD_OK) { + continue; + } + + if (ret != FLB_MP_CHUNK_RECORD_EOF) { + flb_router_chunk_context_reset(context); + return -1; + } + + context->chunk_cobj->record_pos = NULL; + context->chunk_cobj->condition = NULL; + + return 0; +} uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) { @@ -42,41 +154,70 @@ uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk) } int flb_condition_eval_logs(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { - (void) chunk; - (void) route; + int result = FLB_FALSE; + struct flb_route_condition *condition; + struct flb_condition *compiled; + struct flb_mp_chunk_record *record; + struct cfl_list *head; - /* - * The full condition evaluation engine requires field resolvers that map - * record accessors to the different telemetry payload shapes. The wiring - * of those resolvers is part of a bigger effort and will be implemented in - * follow-up changes. For the time being we simply report that the - * condition failed so that the runtime can rely on explicit default - * routes. - */ - return FLB_FALSE; + if (!chunk || !context || !route || !route->condition) { + return FLB_FALSE; + } + + condition = route->condition; + + compiled = route_condition_get_compiled(condition); + if (!compiled) { + return FLB_FALSE; + } + + if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) { + return FLB_FALSE; + } + + if (!context->chunk_cobj) { + return FLB_FALSE; + } + + cfl_list_foreach(head, &context->chunk_cobj->records) { + record = cfl_list_entry(head, struct flb_mp_chunk_record, _head); + + if (flb_condition_evaluate(compiled, record) == FLB_TRUE) { + result = FLB_TRUE; + break; + } + } + + return result; } int flb_condition_eval_metrics(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { (void) chunk; + (void) context; (void) route; return FLB_FALSE; } int flb_condition_eval_traces(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { (void) chunk; + (void) context; (void) route; return FLB_FALSE; } int flb_route_condition_eval(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, struct flb_route *route) { uint32_t signal; @@ -94,8 +235,7 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, return FLB_FALSE; } - if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && - ((route->signals & signal) == 0)) { + if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) { return FLB_FALSE; } @@ -103,17 +243,13 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, return FLB_TRUE; } - if (cfl_list_is_empty(&route->condition->rules) == 0) { - return FLB_TRUE; - } - switch (signal) { case FLB_ROUTER_SIGNAL_LOGS: - return flb_condition_eval_logs(chunk, route); + return flb_condition_eval_logs(chunk, context, route); case FLB_ROUTER_SIGNAL_METRICS: - return flb_condition_eval_metrics(chunk, route); + return flb_condition_eval_metrics(chunk, context, route); case FLB_ROUTER_SIGNAL_TRACES: - return flb_condition_eval_traces(chunk, route); + return flb_condition_eval_traces(chunk, context, route); default: break; } @@ -121,3 +257,204 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk, return FLB_FALSE; } +int flb_router_path_should_route(struct flb_event_chunk *chunk, + struct flb_router_chunk_context *context, + struct flb_router_path *path) +{ + if (!path) { + return FLB_FALSE; + } + + if (chunk && chunk->type == FLB_EVENT_TYPE_LOGS) { + if (!context) { + return FLB_FALSE; + } + + if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) { + return FLB_FALSE; + } + } + + if (!path->route) { + return FLB_TRUE; + } + + return flb_route_condition_eval(chunk, context, path->route); +} + +struct flb_condition *flb_router_route_get_condition(struct flb_route *route) +{ + if (!route || !route->condition) { + return NULL; + } + + return route_condition_get_compiled(route->condition); +} + +static int parse_rule_operator(const flb_sds_t op_str, + enum flb_rule_operator *out) +{ + if (!op_str || !out) { + return -1; + } + + if (strcasecmp(op_str, "eq") == 0) { + *out = FLB_RULE_OP_EQ; + } + else if (strcasecmp(op_str, "neq") == 0) { + *out = FLB_RULE_OP_NEQ; + } + else if (strcasecmp(op_str, "gt") == 0) { + *out = FLB_RULE_OP_GT; + } + else if (strcasecmp(op_str, "lt") == 0) { + *out = FLB_RULE_OP_LT; + } + else if (strcasecmp(op_str, "gte") == 0) { + *out = FLB_RULE_OP_GTE; + } + else if (strcasecmp(op_str, "lte") == 0) { + *out = FLB_RULE_OP_LTE; + } + else if (strcasecmp(op_str, "regex") == 0) { + *out = FLB_RULE_OP_REGEX; + } + else if (strcasecmp(op_str, "not_regex") == 0) { + *out = FLB_RULE_OP_NOT_REGEX; + } + else if (strcasecmp(op_str, "in") == 0) { + *out = FLB_RULE_OP_IN; + } + else if (strcasecmp(op_str, "not_in") == 0) { + *out = FLB_RULE_OP_NOT_IN; + } + else { + return -1; + } + + return 0; +} + +static int parse_numeric_value(flb_sds_t value, double *out) +{ + char *endptr = NULL; + double result; + + if (!value || !out) { + return -1; + } + + errno = 0; + result = strtod(value, &endptr); + if (errno == ERANGE || endptr == value || (endptr && *endptr != '\0')) { + return -1; + } + + *out = result; + return 0; +} + +static struct flb_condition *route_condition_compile(struct flb_route_condition *condition) +{ + int ret; + double numeric_value; + enum flb_rule_operator op; + struct cfl_list *head; + struct flb_condition *compiled; + struct flb_route_condition_rule *rule; + + compiled = flb_condition_create(condition->op); + if (!compiled) { + return NULL; + } + + cfl_list_foreach(head, &condition->rules) { + rule = cfl_list_entry(head, struct flb_route_condition_rule, _head); + + if (!rule->field || !rule->op) { + flb_condition_destroy(compiled); + return NULL; + } + + if (parse_rule_operator(rule->op, &op) != 0) { + flb_condition_destroy(compiled); + return NULL; + } + + switch (op) { + case FLB_RULE_OP_EQ: + case FLB_RULE_OP_NEQ: + case FLB_RULE_OP_REGEX: + case FLB_RULE_OP_NOT_REGEX: + if (!rule->value) { + flb_condition_destroy(compiled); + return NULL; + } + ret = flb_condition_add_rule(compiled, rule->field, op, + rule->value, 1, RECORD_CONTEXT_BODY); + break; + case FLB_RULE_OP_GT: + case FLB_RULE_OP_LT: + case FLB_RULE_OP_GTE: + case FLB_RULE_OP_LTE: + if (!rule->value) { + flb_condition_destroy(compiled); + return NULL; + } + if (parse_numeric_value(rule->value, &numeric_value) != 0) { + flb_condition_destroy(compiled); + return NULL; + } + ret = flb_condition_add_rule(compiled, rule->field, op, + &numeric_value, 1, RECORD_CONTEXT_BODY); + break; + case FLB_RULE_OP_IN: + case FLB_RULE_OP_NOT_IN: + if (!rule->values || rule->values_count == 0) { + flb_condition_destroy(compiled); + return NULL; + } + ret = flb_condition_add_rule(compiled, rule->field, op, + rule->values, + (int) rule->values_count, + RECORD_CONTEXT_BODY); + break; + default: + flb_condition_destroy(compiled); + return NULL; + } + + if (ret != FLB_TRUE) { + flb_condition_destroy(compiled); + return NULL; + } + } + + return compiled; +} + +static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition) +{ + if (!condition) { + return NULL; + } + + if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_FAILURE) { + return NULL; + } + + if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_SUCCESS && + condition->compiled) { + return condition->compiled; + } + + condition->compiled = route_condition_compile(condition); + if (!condition->compiled) { + condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE; + return NULL; + } + + condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS; + return condition->compiled; +} + diff --git a/src/flb_router_config.c b/src/flb_router_config.c index 0513a11afd2..da2da77ba6f 100644 --- a/src/flb_router_config.c +++ b/src/flb_router_config.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -276,10 +277,24 @@ static void route_condition_destroy(struct flb_route_condition *condition) if (rule->value) { flb_sds_destroy(rule->value); } + if (rule->values) { + size_t idx; + + for (idx = 0; idx < rule->values_count; idx++) { + if (rule->values[idx]) { + flb_sds_destroy(rule->values[idx]); + } + } + flb_free(rule->values); + } flb_free(rule); } + if (condition->compiled) { + flb_condition_destroy(condition->compiled); + } + flb_free(condition); } @@ -530,6 +545,8 @@ static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant return NULL; } cfl_list_init(&rule->_head); + rule->values = NULL; + rule->values_count = 0; rule->field = copy_from_cfl_sds(field_var->data.as_string); if (!rule->field) { @@ -544,9 +561,56 @@ static struct flb_route_condition_rule *parse_condition_rule(struct cfl_variant return NULL; } - if (value_var) { + if (!value_var) { + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + if (value_var->type == CFL_VARIANT_ARRAY) { + struct cfl_array *array; + struct cfl_variant *entry; + size_t idx; + + array = value_var->data.as_array; + if (!array || cfl_array_size(array) == 0) { + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + rule->values = flb_calloc(cfl_array_size(array), sizeof(flb_sds_t)); + if (!rule->values) { + flb_errno(); + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + + for (idx = 0; idx < cfl_array_size(array); idx++) { + entry = cfl_array_fetch_by_index(array, idx); + rule->values[idx] = variant_to_sds(entry); + if (!rule->values[idx]) { + size_t j; + + for (j = 0; j < idx; j++) { + flb_sds_destroy(rule->values[j]); + } + flb_free(rule->values); + flb_sds_destroy(rule->op); + flb_sds_destroy(rule->field); + flb_free(rule); + return NULL; + } + } + rule->values_count = cfl_array_size(array); + } + else { rule->value = variant_to_sds(value_var); - if (!rule->value && strcmp(rule->op, "exists") != 0) { + if (!rule->value) { flb_sds_destroy(rule->op); flb_sds_destroy(rule->field); flb_free(rule); @@ -563,6 +627,7 @@ static struct flb_route_condition *parse_condition(struct cfl_variant *variant, struct flb_route_condition *condition; struct cfl_variant *rules_var; struct cfl_variant *default_var; + struct cfl_variant *op_var; struct cfl_array *rules_array; struct cfl_variant *entry; struct flb_route_condition_rule *rule; @@ -579,9 +644,31 @@ static struct flb_route_condition *parse_condition(struct cfl_variant *variant, return NULL; } cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled = NULL; + condition->compiled_status = 0; rules_var = cfl_kvlist_fetch(variant->data.as_kvlist, "rules"); default_var = cfl_kvlist_fetch(variant->data.as_kvlist, "default"); + op_var = cfl_kvlist_fetch(variant->data.as_kvlist, "op"); + + if (op_var) { + if (op_var->type != CFL_VARIANT_STRING) { + route_condition_destroy(condition); + return NULL; + } + + if (strcasecmp(op_var->data.as_string, "and") == 0) { + condition->op = FLB_COND_OP_AND; + } + else if (strcasecmp(op_var->data.as_string, "or") == 0) { + condition->op = FLB_COND_OP_OR; + } + else { + route_condition_destroy(condition); + return NULL; + } + } if (default_var) { if (variant_to_bool(default_var, &val) != 0) { @@ -780,6 +867,7 @@ static int parse_route(struct cfl_variant *variant, cfl_list_init(&route->outputs); cfl_list_init(&route->processors); route->signals = signals; + route->per_record_routing = FLB_FALSE; // Default to false route->name = copy_from_cfl_sds(name_var->data.as_string); if (!route->name) { @@ -804,6 +892,15 @@ static int parse_route(struct cfl_variant *variant, } } + // Parse per_record_routing option + struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing"); + if (per_record_var) { + int val; + if (variant_to_bool(per_record_var, &val) == 0) { + route->per_record_routing = val; + } + } + processors_var = cfl_kvlist_fetch(kvlist, "processors"); if (processors_var) { if (parse_processors(processors_var, &route->processors, config) != 0) { @@ -1109,15 +1206,15 @@ static struct flb_output_instance *find_output_instance(struct flb_config *confi static int input_has_direct_route(struct flb_input_instance *in, struct flb_output_instance *out) { - struct mk_list *head; + struct cfl_list *head; struct flb_router_path *path; if (!in || !out) { return FLB_FALSE; } - mk_list_foreach(head, &in->routes_direct) { - path = mk_list_entry(head, struct flb_router_path, _head); + cfl_list_foreach(head, &in->routes_direct) { + path = cfl_list_entry(head, struct flb_router_path, _head); if (path->ins == out) { return FLB_TRUE; } @@ -1153,6 +1250,7 @@ static int output_supports_signals(struct flb_output_instance *out, uint32_t sig int flb_router_apply_config(struct flb_config *config) { + int created = 0; struct cfl_list *input_head; struct cfl_list *route_head; struct cfl_list *output_head; @@ -1162,15 +1260,12 @@ int flb_router_apply_config(struct flb_config *config) struct flb_input_instance *input_ins; struct flb_output_instance *output_ins; struct flb_output_instance *fallback_ins; - int created; + struct flb_router_path *path; if (!config) { return 0; } - flb_debug("[router] applying router configuration"); - created = 0; - cfl_list_foreach(input_head, &config->input_routes) { input_routes = cfl_list_entry(input_head, struct flb_input_routes, _head); @@ -1220,6 +1315,8 @@ int flb_router_apply_config(struct flb_config *config) } if (flb_router_connect_direct(input_ins, output_ins) == 0) { + path = cfl_list_entry_last(&input_ins->routes_direct, struct flb_router_path, _head); + path->route = route; created++; flb_debug("[router] connected input '%s' route '%s' to output '%s'", flb_input_name(input_ins), diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index e10ad9a463a..4ff295871a3 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -144,7 +144,7 @@ int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, { return memcmp(routes_mask, config->route_empty_mask, - config->route_mask_size) == 0; + config->route_mask_size * sizeof(flb_route_mask_element)) == 0; } int flb_routes_empty_mask_create(struct flb_config *config) diff --git a/src/flb_sosreport.c b/src/flb_sosreport.c index 3db87bd1336..5074aa5aeb8 100644 --- a/src/flb_sosreport.c +++ b/src/flb_sosreport.c @@ -167,7 +167,7 @@ int flb_sosreport(struct flb_config *config) { char tmp[32]; struct mk_list *head; - struct mk_list *head_r; + struct cfl_list *head_r; struct flb_input_plugin *in; struct flb_filter_plugin *filter; struct flb_output_plugin *out; @@ -266,10 +266,10 @@ int flb_sosreport(struct flb_config *config) print_properties(&ins_in->properties); /* Fixed Routes */ - if (mk_list_is_empty(&ins_in->routes) != 0) { + if (!cfl_list_is_empty(&ins_in->routes)) { printf(" Routes\t\t"); - mk_list_foreach(head_r, &ins_in->routes) { - route = mk_list_entry(head_r, struct flb_router_path, _head); + cfl_list_foreach(head_r, &ins_in->routes) { + route = cfl_list_entry(head_r, struct flb_router_path, _head); printf("%s ", route->ins->name); } printf("\n"); diff --git a/src/flb_task.c b/src/flb_task.c index 75263ff70d4..a829b73abf3 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -360,14 +360,17 @@ struct flb_task *flb_task_create(uint64_t ref_id, { int count = 0; int total_events = 0; + int direct_count = 0; struct flb_task *task; struct flb_event_chunk *evc; struct flb_task_route *route; struct flb_router_path *route_path; struct flb_output_instance *o_ins; struct flb_input_chunk *task_ic; - struct mk_list *i_head; + struct cfl_list *i_head; struct mk_list *o_head; + struct flb_router_chunk_context router_context; + int router_context_initialized = FLB_FALSE; /* No error status */ *err = FLB_FALSE; @@ -392,6 +395,15 @@ struct flb_task *flb_task_create(uint64_t ref_id, return NULL; } + if (flb_router_chunk_context_init(&router_context) != 0) { + flb_error("[task] failed to initialize router chunk context"); + flb_event_chunk_destroy(evc); + flb_free(task); + *err = FLB_TRUE; + return NULL; + } + router_context_initialized = FLB_TRUE; + #ifdef FLB_HAVE_CHUNK_TRACE if (ic->trace) { flb_debug("add trace to task"); @@ -414,23 +426,56 @@ struct flb_task *flb_task_create(uint64_t ref_id, #endif /* Direct connects betweek input <> outputs (API based) */ - if (mk_list_size(&i_ins->routes_direct) > 0) { - mk_list_foreach(i_head, &i_ins->routes_direct) { - route_path = mk_list_entry(i_head, struct flb_router_path, _head); + if (cfl_list_size(&i_ins->routes_direct) > 0) { + direct_count = 0; + + cfl_list_foreach(i_head, &i_ins->routes_direct) { + route_path = cfl_list_entry(i_head, struct flb_router_path, _head); + + if (flb_router_path_should_route(task->event_chunk, + &router_context, + route_path) == FLB_FALSE) { + continue; + } + o_ins = route_path->ins; - route = flb_malloc(sizeof(struct flb_task_route)); + route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno(); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } task->event_chunk->data = NULL; flb_task_destroy(task, FLB_TRUE); return NULL; } + route->status = FLB_TASK_ROUTE_INACTIVE; route->out = o_ins; mk_list_add(&route->_head, &task->routes); + direct_count++; + } + + if (direct_count == 0) { + flb_debug("[task] dropping direct task=%p id=%i without matching routes", + task, task->id); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } + task->event_chunk->data = NULL; + flb_task_destroy(task, FLB_TRUE); + return NULL; + } + + flb_debug("[task] created direct task=%p id=%i with %i route(s)", + task, task->id, direct_count); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; } - flb_debug("[task] created direct task=%p id=%i OK", task, task->id); return task; } @@ -444,7 +489,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, continue; } - if (flb_routes_mask_get_bit(task_ic->routes_mask, + if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id, o_ins->config) != 0) { route = flb_calloc(1, sizeof(struct flb_task_route)); @@ -464,11 +509,19 @@ struct flb_task *flb_task_create(uint64_t ref_id, if (count == 0) { flb_debug("[task] created task=%p id=%i without routes, dropping.", task, task->id); + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } task->event_chunk->data = NULL; flb_task_destroy(task, FLB_TRUE); return NULL; } + if (router_context_initialized) { + flb_router_chunk_context_destroy(&router_context); + router_context_initialized = FLB_FALSE; + } flb_debug("[task] created task=%p id=%i OK", task, task->id); return task; } diff --git a/tests/internal/conditionals.c b/tests/internal/conditionals.c index 81df2059c1b..7ce49021fdc 100644 --- a/tests/internal/conditionals.c +++ b/tests/internal/conditionals.c @@ -876,6 +876,22 @@ void test_condition_missing_values() flb_condition_destroy(cond); destroy_test_record(record_data); + /* Test metadata rule when metadata is absent */ + record_data = create_test_record("message", "test log entry"); + TEST_CHECK(record_data != NULL); + + cond = flb_condition_create(FLB_COND_OP_AND); + TEST_CHECK(cond != NULL); + + TEST_CHECK(flb_condition_add_rule(cond, "$stream", FLB_RULE_OP_EQ, + "production", 0, RECORD_CONTEXT_METADATA) == FLB_TRUE); + + result = flb_condition_evaluate(cond, &record_data->chunk); + TEST_CHECK(result == FLB_FALSE); /* Missing metadata should return false */ + + flb_condition_destroy(cond); + destroy_test_record(record_data); + /* Test NOT_IN operator with present field not in array */ record_data = create_test_record("level", "info"); TEST_CHECK(record_data != NULL); @@ -1817,4 +1833,4 @@ TEST_LIST = { {"gte_lte_multiple", test_condition_gte_lte_multiple}, {"gte_lte_border_cases", test_condition_gte_lte_border_cases}, {NULL, NULL} -}; \ No newline at end of file +}; diff --git a/tests/internal/input_chunk.c b/tests/internal/input_chunk.c index 945ddbbf13b..773eb03cf9c 100644 --- a/tests/internal/input_chunk.c +++ b/tests/internal/input_chunk.c @@ -309,42 +309,6 @@ void flb_test_input_chunk_dropping_chunks() flb_destroy(ctx); } -/* - * When chunk is set to DOWN from memory, data_size is set to 0 and - * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size - * is used to track the size of chunks in filesystem so we need to call - * cio_chunk_get_real_size to return the original size in the file system - */ -static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic) -{ - ssize_t meta_size; - ssize_t size; - - size = cio_chunk_get_real_size(ic->chunk); - - if (size != 0) { - return size; - } - - // Real size is not synced to chunk yet - size = flb_input_chunk_get_size(ic); - if (size == 0) { - flb_debug("[input chunk] no data in the chunk %s", - flb_input_chunk_get_name(ic)); - return -1; - } - - meta_size = cio_meta_size(ic->chunk); - size += meta_size - /* See https://github.com/edsiper/chunkio#file-layout for more details */ - + 2 /* HEADER BYTES */ - + 4 /* CRC32 */ - + 16 /* PADDING */ - + 2; /* METADATA LENGTH BYTES */ - - return size; -} - static int gen_buf(msgpack_sbuffer *mp_sbuf, char *buf, size_t buf_size) { msgpack_unpacked result; diff --git a/tests/internal/router_config.c b/tests/internal/router_config.c index c479c978021..9f4c888b3d1 100644 --- a/tests/internal/router_config.c +++ b/tests/internal/router_config.c @@ -6,7 +6,9 @@ #include #include #include +#include #include +#include #include #include @@ -27,6 +29,105 @@ static struct cfl_variant *clone_variant(struct cfl_variant *var); +static int build_log_chunk(const char *level, + struct flb_log_event_encoder *encoder, + struct flb_event_chunk *chunk) +{ + int ret; + + if (!level || !encoder || !chunk) { + return -1; + } + + ret = flb_log_event_encoder_init(encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return -1; + } + + ret = flb_log_event_encoder_begin_record(encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + ret = flb_log_event_encoder_set_current_timestamp(encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + ret = flb_log_event_encoder_append_body_values( + encoder, + FLB_LOG_EVENT_STRING_VALUE("level", 5), + FLB_LOG_EVENT_CSTRING_VALUE(level)); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + ret = flb_log_event_encoder_commit_record(encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(encoder); + return -1; + } + + memset(chunk, 0, sizeof(*chunk)); + chunk->type = FLB_EVENT_TYPE_LOGS; + chunk->data = encoder->output_buffer; + chunk->size = encoder->output_length; + chunk->total_events = 1; + + return 0; +} + +static void free_route_condition(struct flb_route_condition *condition) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct flb_route_condition_rule *rule; + size_t idx; + + if (!condition) { + return; + } + + if (condition->compiled) { + flb_condition_destroy(condition->compiled); + } + + cfl_list_foreach_safe(head, tmp, &condition->rules) { + rule = cfl_list_entry(head, struct flb_route_condition_rule, _head); + cfl_list_del(&rule->_head); + + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + if (rule->values) { + for (idx = 0; idx < rule->values_count; idx++) { + if (rule->values[idx]) { + flb_sds_destroy(rule->values[idx]); + } + } + flb_free(rule->values); + } + + flb_free(rule); + } + + flb_free(condition); +} + static struct cfl_array *clone_array(struct cfl_array *array) { struct cfl_array *copy; @@ -673,8 +774,8 @@ static void setup_test_instances(struct flb_config *config, memset(input, 0, sizeof(struct flb_input_instance)); mk_list_init(&input->_head); - mk_list_init(&input->routes_direct); - mk_list_init(&input->routes); + cfl_list_init(&input->routes_direct); + cfl_list_init(&input->routes); mk_list_init(&input->tasks); mk_list_init(&input->chunks); mk_list_init(&input->collectors); @@ -742,9 +843,9 @@ void test_router_apply_config_success() cfl_list_add(&route_output._head, &route.outputs); TEST_CHECK(flb_router_apply_config(&config) == 0); - TEST_CHECK(mk_list_size(&input.routes_direct) == 1); + TEST_CHECK(cfl_list_size(&input.routes_direct) == 1); - path = mk_list_entry(input.routes_direct.next, struct flb_router_path, _head); + path = cfl_list_entry(input.routes_direct.next, struct flb_router_path, _head); TEST_CHECK(path->ins == &output); flb_router_exit(&config); @@ -790,8 +891,8 @@ void test_router_apply_config_missing_output() TEST_CHECK(flb_router_apply_config(&config) == 0); - /* note mk_list_is_empty return 0 if is empty */ - TEST_CHECK(!mk_list_is_empty(&input.routes_direct)); + /* When output is missing, no routes should be created */ + TEST_CHECK(cfl_list_is_empty(&input.routes_direct)); flb_router_exit(&config); @@ -838,7 +939,7 @@ void test_router_route_default_precedence() memset(&chunk, 0, sizeof(chunk)); chunk.type = FLB_EVENT_TYPE_LOGS; - TEST_CHECK(flb_route_condition_eval(&chunk, route) == FLB_TRUE); + TEST_CHECK(flb_route_condition_eval(&chunk, NULL, route) == FLB_TRUE); output = cfl_list_entry(route->outputs.next, struct flb_route_output, _head); TEST_CHECK(strcmp(output->name, "lib_route") == 0); @@ -850,6 +951,275 @@ void test_router_route_default_precedence() flb_cf_destroy(cf); } +static void test_router_condition_eval_logs_match() +{ + struct flb_route route; + struct flb_route_condition *condition; + struct flb_route_condition_rule *rule; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + struct flb_router_chunk_context context; + int ret; + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route.outputs); + cfl_list_init(&route.processors); + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + TEST_CHECK(condition != NULL); + if (!condition) { + return; + } + + cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled_status = 0; + condition->compiled = NULL; + condition->is_default = FLB_FALSE; + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + TEST_CHECK(rule != NULL); + if (!rule) { + free_route_condition(condition); + return; + } + + cfl_list_init(&rule->_head); + rule->field = flb_sds_create("$level"); + rule->op = flb_sds_create("eq"); + rule->value = flb_sds_create("error"); + TEST_CHECK(rule->field != NULL && rule->op != NULL && rule->value != NULL); + if (!rule->field || !rule->op || !rule->value) { + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + flb_free(rule); + free_route_condition(condition); + return; + } + + cfl_list_add(&rule->_head, &condition->rules); + + route.condition = condition; + route.signals = FLB_ROUTER_SIGNAL_LOGS; + + flb_router_chunk_context_init(&context); + + ret = build_log_chunk("error", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_TRUE); + } + flb_router_chunk_context_reset(&context); + flb_log_event_encoder_destroy(&encoder); + + ret = build_log_chunk("info", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_FALSE); + } + flb_router_chunk_context_reset(&context); + flb_log_event_encoder_destroy(&encoder); + + flb_router_chunk_context_destroy(&context); + free_route_condition(condition); +} + +static void test_router_condition_eval_logs_in_operator() +{ + struct flb_route route; + struct flb_route_condition *condition; + struct flb_route_condition_rule *rule; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + struct flb_router_chunk_context context; + int ret; + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route.outputs); + cfl_list_init(&route.processors); + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + TEST_CHECK(condition != NULL); + if (!condition) { + return; + } + + cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled_status = 0; + condition->compiled = NULL; + condition->is_default = FLB_FALSE; + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + TEST_CHECK(rule != NULL); + if (!rule) { + free_route_condition(condition); + return; + } + + cfl_list_init(&rule->_head); + rule->field = flb_sds_create("$level"); + rule->op = flb_sds_create("in"); + TEST_CHECK(rule->field != NULL && rule->op != NULL); + if (!rule->field || !rule->op) { + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + flb_free(rule); + free_route_condition(condition); + return; + } + + rule->values_count = 2; + rule->values = flb_calloc(rule->values_count, sizeof(flb_sds_t)); + TEST_CHECK(rule->values != NULL); + if (!rule->values) { + free_route_condition(condition); + flb_sds_destroy(rule->field); + flb_sds_destroy(rule->op); + flb_free(rule); + return; + } + + rule->values[0] = flb_sds_create("error"); + rule->values[1] = flb_sds_create("fatal"); + TEST_CHECK(rule->values[0] != NULL && rule->values[1] != NULL); + if (!rule->values[0] || !rule->values[1]) { + if (rule->values[0]) { + flb_sds_destroy(rule->values[0]); + } + if (rule->values[1]) { + flb_sds_destroy(rule->values[1]); + } + flb_free(rule->values); + flb_sds_destroy(rule->field); + flb_sds_destroy(rule->op); + flb_free(rule); + free_route_condition(condition); + return; + } + + cfl_list_add(&rule->_head, &condition->rules); + + route.condition = condition; + route.signals = FLB_ROUTER_SIGNAL_LOGS; + + flb_router_chunk_context_init(&context); + + ret = build_log_chunk("fatal", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_TRUE); + } + flb_router_chunk_context_reset(&context); + flb_log_event_encoder_destroy(&encoder); + + ret = build_log_chunk("debug", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_condition_eval_logs(&chunk, &context, &route) == FLB_FALSE); + } + flb_router_chunk_context_reset(&context); + flb_log_event_encoder_destroy(&encoder); + + flb_router_chunk_context_destroy(&context); + free_route_condition(condition); +} + +static void test_router_path_should_route_condition() +{ + struct flb_router_path path; + struct flb_route route; + struct flb_route_condition *condition; + struct flb_route_condition_rule *rule; + struct flb_log_event_encoder encoder; + struct flb_event_chunk chunk; + struct flb_router_chunk_context context; + int ret; + + memset(&route, 0, sizeof(route)); + cfl_list_init(&route.outputs); + cfl_list_init(&route.processors); + + condition = flb_calloc(1, sizeof(struct flb_route_condition)); + TEST_CHECK(condition != NULL); + if (!condition) { + return; + } + + cfl_list_init(&condition->rules); + condition->op = FLB_COND_OP_AND; + condition->compiled_status = 0; + condition->compiled = NULL; + condition->is_default = FLB_FALSE; + + rule = flb_calloc(1, sizeof(struct flb_route_condition_rule)); + TEST_CHECK(rule != NULL); + if (!rule) { + free_route_condition(condition); + return; + } + + cfl_list_init(&rule->_head); + rule->field = flb_sds_create("$level"); + rule->op = flb_sds_create("eq"); + rule->value = flb_sds_create("error"); + TEST_CHECK(rule->field != NULL && rule->op != NULL && rule->value != NULL); + if (!rule->field || !rule->op || !rule->value) { + if (rule->field) { + flb_sds_destroy(rule->field); + } + if (rule->op) { + flb_sds_destroy(rule->op); + } + if (rule->value) { + flb_sds_destroy(rule->value); + } + flb_free(rule); + free_route_condition(condition); + return; + } + + cfl_list_add(&rule->_head, &condition->rules); + + route.condition = condition; + route.signals = FLB_ROUTER_SIGNAL_LOGS; + + memset(&path, 0, sizeof(path)); + path.route = &route; + + flb_router_chunk_context_init(&context); + + ret = build_log_chunk("error", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_router_path_should_route(&chunk, &context, &path) == FLB_TRUE); + } + flb_router_chunk_context_reset(&context); + flb_log_event_encoder_destroy(&encoder); + + ret = build_log_chunk("info", &encoder, &chunk); + TEST_CHECK(ret == 0); + if (ret == 0) { + TEST_CHECK(flb_router_path_should_route(&chunk, &context, &path) == FLB_FALSE); + } + flb_router_chunk_context_reset(&context); + flb_log_event_encoder_destroy(&encoder); + + flb_router_chunk_context_destroy(&context); + free_route_condition(condition); +} + TEST_LIST = { { "parse_basic", test_router_config_parse_basic }, { "duplicate_route", test_router_config_duplicate_route }, @@ -859,5 +1229,8 @@ TEST_LIST = { { "apply_config_success", test_router_apply_config_success }, { "apply_config_missing_output", test_router_apply_config_missing_output }, { "route_default_precedence", test_router_route_default_precedence }, + { "condition_eval_logs_match", test_router_condition_eval_logs_match }, + { "condition_eval_logs_in_operator", test_router_condition_eval_logs_in_operator }, + { "path_should_route_condition", test_router_path_should_route_condition }, { 0 } };