diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 10fe72a48b8..4ee6ec1afbc 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -31,6 +31,8 @@ #include +struct flb_router; + #define FLB_CONFIG_FLUSH_SECS 1 #define FLB_CONFIG_HTTP_LISTEN "0.0.0.0" #define FLB_CONFIG_HTTP_PORT "2020" @@ -293,9 +295,7 @@ struct flb_config { int hot_reload_watchdog_timeout_seconds; /* Routing */ - size_t route_mask_size; - size_t route_mask_slots; - uint64_t *route_empty_mask; + struct flb_router *router; #ifdef FLB_SYSTEM_WINDOWS /* maxstdio (Windows) */ int win_maxstdio; diff --git a/include/fluent-bit/flb_router.h b/include/fluent-bit/flb_router.h index c0d181e670e..f934ac7d8e3 100644 --- a/include/fluent-bit/flb_router.h +++ b/include/fluent-bit/flb_router.h @@ -28,6 +28,8 @@ #include #include #include +#include +#include #include struct flb_mp_chunk_cobj; @@ -47,6 +49,22 @@ struct flb_router_path { struct cfl_list _head; }; +struct flb_router { + /* Routing masks */ + size_t route_mask_size; + size_t route_mask_slots; + uint64_t *route_empty_mask; + + /* metrics context */ + struct cmt *cmt; + + /* logs routing metrics */ + struct cmt_counter *logs_records_total; + struct cmt_counter *logs_bytes_total; + struct cmt_counter *logs_drop_records_total; + struct cmt_counter *logs_drop_bytes_total; +}; + static inline int flb_router_match_type(int in_event_type, struct flb_output_instance *o_ins) { @@ -183,5 +201,9 @@ int flb_router_config_parse(struct flb_cf *cf, void flb_router_routes_destroy(struct cfl_list *input_routes); int flb_router_apply_config(struct flb_config *config); +int flb_router_metrics_create(struct flb_config *config, struct flb_router *router); +struct flb_router *flb_router_create(struct flb_config *config); +void flb_router_destroy(struct flb_router *router); + #endif diff --git a/include/fluent-bit/flb_routes_mask.h b/include/fluent-bit/flb_routes_mask.h index 3f47f549124..5be1914045e 100644 --- a/include/fluent-bit/flb_routes_mask.h +++ b/include/fluent-bit/flb_routes_mask.h @@ -46,23 +46,26 @@ typedef uint64_t flb_route_mask_element; /* forward declaration */ struct flb_input_instance; struct flb_config; +struct flb_router; int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, const char *tag, int tag_len, struct flb_input_instance *in); -int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config); -int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, - struct flb_config *config); +int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, + struct flb_router *router); +int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, + struct flb_router *router); -int flb_routes_empty_mask_create(struct flb_config *config); -void flb_routes_empty_mask_destroy(struct flb_config *config); +int flb_routes_empty_mask_create(struct flb_router *router); +void flb_routes_empty_mask_destroy(struct flb_router *router); -int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config); +int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router); +size_t flb_routes_mask_get_size(struct flb_router *router); +size_t flb_routes_mask_get_slots(struct flb_router *router); #endif diff --git a/plugins/in_storage_backlog/sb.c b/plugins/in_storage_backlog/sb.c index de3f0b4694e..5e0ecba8a45 100644 --- a/plugins/in_storage_backlog/sb.c +++ b/plugins/in_storage_backlog/sb.c @@ -281,12 +281,14 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun int tag_len; const char * tag_buf; int result; + size_t slots; memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk)); + slots = flb_routes_mask_get_slots(context->ins->config->router); memset(context->dummy_routes_mask, 0, - context->ins->config->route_mask_slots * sizeof(flb_route_mask_element)); + slots * sizeof(flb_route_mask_element)); dummy_input_chunk.in = context->ins; dummy_input_chunk.chunk = target_chunk; @@ -317,7 +319,7 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun backlog = mk_list_entry(head, struct sb_out_queue, _head); if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask, backlog->ins->id, - backlog->ins->config)) { + backlog->ins->config->router)) { result = sb_append_chunk_to_segregated_backlog(target_chunk, stream, chunk_size, backlog); if (result) { @@ -656,6 +658,7 @@ static int cb_sb_init(struct flb_input_instance *in, int ret; char mem[32]; struct flb_sb *ctx; + size_t slots; ctx = flb_calloc(1, sizeof(struct flb_sb)); @@ -664,7 +667,8 @@ static int cb_sb_init(struct flb_input_instance *in, return -1; } - ctx->dummy_routes_mask = flb_calloc(in->config->route_mask_slots, + slots = flb_routes_mask_get_slots(config->router); + ctx->dummy_routes_mask = flb_calloc(slots, sizeof(flb_route_mask_element)); if (ctx->dummy_routes_mask == NULL) { diff --git a/src/flb_config.c b/src/flb_config.c index 268ce8ce9d0..b75c83b387a 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -304,7 +304,36 @@ struct flb_config *flb_config_init() } /* Routing */ - flb_routes_mask_set_size(1, config); + config->router = flb_router_create(config); + if (!config->router) { + flb_error("[config] could not create router"); + if (config->kernel) { + flb_kernel_destroy(config->kernel); + } +#ifdef FLB_HAVE_HTTP_SERVER + if (config->http_listen) { + flb_free(config->http_listen); + } + + if (config->http_port) { + flb_free(config->http_port); + } +#endif + flb_cf_destroy(cf); + flb_free(config); + return NULL; + } + ret = flb_routes_mask_set_size(1, config->router); + if (ret != 0) { + flb_error("[config] routing mask dimensioning failed"); + flb_router_destroy(config->router); + if (config->kernel) { + flb_kernel_destroy(config->kernel); + } + flb_cf_destroy(cf); + flb_free(config); + return NULL; + } config->cio = NULL; config->storage_path = NULL; @@ -613,7 +642,8 @@ void flb_config_exit(struct flb_config *config) /* release task map */ flb_config_task_map_resize(config, 0); - flb_routes_empty_mask_destroy(config); + + flb_router_destroy(config->router); /* Clean up router input routes */ flb_router_routes_destroy(&config->input_routes); diff --git a/src/flb_engine.c b/src/flb_engine.c index d7fcd7a6223..c1ca75f8527 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -243,7 +244,8 @@ static inline int handle_output_event(uint64_t ts, uint32_t type; uint32_t key; double latency_seconds; - char *name; + char *in_name; + char *out_name; struct flb_task *task; struct flb_task_retry *retry; struct flb_output_instance *ins; @@ -289,7 +291,8 @@ static inline int handle_output_event(uint64_t ts, if (flb_output_is_threaded(ins) == FLB_FALSE) { flb_output_flush_finished(config, out_id); } - name = (char *) flb_output_name(ins); + in_name = (char *) flb_input_name(task->i_ins); + out_name = (char *) flb_output_name(ins); /* If we are in synchronous mode, flush the next waiting task */ if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) { @@ -302,16 +305,26 @@ static inline int handle_output_event(uint64_t ts, if (ret == FLB_OK) { /* cmetrics */ cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (config->router && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { + cmt_counter_add(config->router->logs_records_total, ts, + task->event_chunk->total_events, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } /* latency histogram */ if (ins->cmt_latency) { latency_seconds = flb_time_now() - ((struct flb_input_chunk *) task->ic)->create_time; cmt_histogram_observe(ins->cmt_latency, ts, latency_seconds, 2, - (char *[]) {(char *) flb_input_name(task->i_ins), name}); + (char *[]) {in_name, out_name}); } /* [OLD API] Update metrics */ @@ -346,7 +359,7 @@ static inline int handle_output_event(uint64_t ts, cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); flb_task_retry_clean(task, ins); flb_task_users_dec(task, FLB_TRUE); @@ -355,11 +368,22 @@ static inline int handle_output_event(uint64_t ts, if (ins->retry_limit == FLB_OUT_RETRY_NONE) { /* cmetrics: output_dropped_records_total */ cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (config->router && task->event_chunk && + task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API */ #ifdef FLB_HAVE_METRICS @@ -389,13 +413,24 @@ static inline int handle_output_event(uint64_t ts, */ /* cmetrics */ - cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (config->router && task->event_chunk && + task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API */ #ifdef FLB_HAVE_METRICS @@ -449,13 +484,13 @@ static inline int handle_output_event(uint64_t ts, flb_output_name(ins), out_id); /* cmetrics */ - cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_retried_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD metrics API: update the metrics since a new retry is coming */ #ifdef FLB_HAVE_METRICS @@ -466,13 +501,24 @@ static inline int handle_output_event(uint64_t ts, } else if (ret == FLB_ERROR) { /* cmetrics */ - cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name}); + cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name}); cmt_counter_add(ins->cmt_dropped_records, ts, task->records, - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); + + if (config->router && task->event_chunk && + task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { + cmt_counter_add(config->router->logs_drop_records_total, ts, + task->records, + 2, (char *[]) {in_name, out_name}); + + cmt_counter_add(config->router->logs_drop_bytes_total, ts, + task->event_chunk->size, + 2, (char *[]) {in_name, out_name}); + } cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, calculate_chunk_capacity_percent(ins), - 1, (char *[]) {name}); + 1, (char *[]) {out_name}); /* OLD API */ #ifdef FLB_HAVE_METRICS @@ -811,14 +857,7 @@ int flb_engine_start(struct flb_config *config) config->notification_channels_initialized = FLB_TRUE; config->notification_event.type = FLB_ENGINE_EV_NOTIFICATION; - ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config); - - if (ret != 0) { - flb_error("[engine] routing mask dimensioning failed"); - return -1; - } - - ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config); + ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config->router); if (ret != 0) { flb_error("[engine] routing mask dimensioning failed"); diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 656a17815bc..3c0e343dde1 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -296,6 +296,7 @@ static int flb_input_chunk_release_space( { struct mk_list *input_chunk_iterator_tmp; struct mk_list *input_chunk_iterator; + struct flb_router *router; ssize_t dropped_record_count; int chunk_destroy_flag; struct flb_input_chunk *old_input_chunk; @@ -312,7 +313,7 @@ static int flb_input_chunk_release_space( if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, output_plugin->id, - input_plugin->config)) { + input_plugin->config->router)) { continue; } @@ -335,14 +336,14 @@ static int flb_input_chunk_release_space( if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) { flb_routes_mask_clear_bit(old_input_chunk->routes_mask, output_plugin->id, - input_plugin->config); + input_plugin->config->router); FS_CHUNK_SIZE_DEBUG_MOD(output_plugin, old_input_chunk, chunk_size); output_plugin->fs_chunks_size -= chunk_size; chunk_destroy_flag = flb_routes_mask_is_empty( old_input_chunk->routes_mask, - input_plugin->config); + input_plugin->config->router); chunk_released = FLB_TRUE; } @@ -364,6 +365,25 @@ static int flb_input_chunk_release_space( dropped_record_count, 1, (char *[]) {(char *) flb_output_name(output_plugin)}); + if (input_plugin->config && input_plugin->config->router && + old_input_chunk->event_type == FLB_INPUT_LOGS) { + router = input_plugin->config->router; + + cmt_counter_add(router->logs_drop_records_total, + cfl_time_now(), + (double) dropped_record_count, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + + cmt_counter_add(router->logs_drop_bytes_total, + cfl_time_now(), + (double) chunk_size, + 2, + (char *[]){(char *) flb_input_name(old_input_chunk->in), + (char *) flb_output_name(output_plugin)}); + } + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, dropped_record_count, output_plugin->metrics); @@ -561,7 +581,7 @@ static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic, */ if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id, - ic->in->config) == 0) { + ic->in->config->router) == 0) { return FLB_FALSE; } @@ -666,7 +686,7 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic, if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 || (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) == 0)) { + o_ins->config->router) == 0)) { continue; } @@ -708,7 +728,7 @@ int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic, if ((o_ins->total_limit_size == -1) || (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) == 0)) { + o_ins->config->router) == 0)) { continue; } @@ -750,7 +770,7 @@ int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_siz } } return !flb_routes_mask_is_empty(ic->routes_mask, - i_ins->config); + i_ins->config->router); } static int input_chunk_collect_output_references(struct flb_config *config, @@ -916,6 +936,7 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, size_t direct_match_count; size_t direct_match_index; struct flb_input_chunk *ic; + size_t mask_size; records = 0; @@ -940,8 +961,9 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, return NULL; } + mask_size = flb_routes_mask_get_size(in->config->router); ic->routes_mask = (flb_route_mask_element *) - flb_calloc(in->config->route_mask_size, + flb_calloc(mask_size, sizeof(flb_route_mask_element)); if (ic->routes_mask == NULL) { @@ -1121,7 +1143,7 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, if (direct_missing == FLB_FALSE) { memset(ic->routes_mask, 0, - sizeof(flb_route_mask_element) * in->config->route_mask_size); + sizeof(flb_route_mask_element) * mask_size); has_routes = 0; for (direct_index = 0; direct_index < direct_count; direct_index++) { direct_matches = NULL; @@ -1142,7 +1164,7 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, direct_match_index++) { flb_routes_mask_set_bit(ic->routes_mask, direct_matches[direct_match_index]->id, - in->config); + in->config->router); has_routes++; } @@ -1934,6 +1956,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in int err; int set_down = FLB_FALSE; int has_routes; + size_t mask_size; char name[64]; struct cio_chunk *chunk; struct flb_storage_input *storage; @@ -1998,8 +2021,9 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in #ifdef FLB_HAVE_METRICS ic->total_records = 0; #endif + mask_size = flb_routes_mask_get_size(in->config->router); ic->routes_mask = (flb_route_mask_element *) - flb_calloc(in->config->route_mask_size, + flb_calloc(mask_size, sizeof(flb_route_mask_element)); if (ic->routes_mask == NULL) { @@ -2063,7 +2087,7 @@ int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic, if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { if (ic->fs_counted == FLB_TRUE) { FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); o_ins->fs_chunks_size -= bytes; @@ -2147,7 +2171,7 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del) if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { if (ic->fs_counted == FLB_TRUE) { FS_CHUNK_SIZE_DEBUG_MOD(o_ins, ic, -bytes); o_ins->fs_chunks_size -= bytes; @@ -2305,7 +2329,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks * (based in creation time) to get enough space for the incoming chunk. */ - if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) + if (!flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) { /* * If the chunk is not newly created, the chunk might already have logs inside. @@ -2314,7 +2338,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, * the chunk. */ if (new_chunk || - flb_routes_mask_is_empty(ic->routes_mask, ic->in->config) == FLB_TRUE) { + flb_routes_mask_is_empty(ic->routes_mask, ic->in->config->router) == FLB_TRUE) { flb_input_chunk_destroy(ic, FLB_TRUE); } return NULL; @@ -3247,7 +3271,7 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { /* * if there is match on any index of 1's in the binary, it indicates * that the input chunk will flush to this output instance diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 9bc799d02ac..c74400cbe64 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -102,6 +102,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, size_t out_size = 0; size_t chunk_size_sz = 0; ssize_t chunk_size; + size_t size; int direct_count; int direct_index; int write_ret; @@ -155,7 +156,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, if (flb_routes_mask_get_bit(chunk->routes_mask, route_path->ins->id, - ins->config) == 0) { + ins->config->router) == 0) { continue; } @@ -174,11 +175,12 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, flb_routes_mask_clear_bit(chunk->routes_mask, route_path->ins->id, - ins->config); + ins->config->router); } } - memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * ins->config->route_mask_size); + size = flb_routes_mask_get_size(ins->config->router); + memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * size); cfl_list_foreach(head, &ins->routes_direct) { route_path = cfl_list_entry(head, struct flb_router_path, _head); @@ -192,10 +194,10 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, flb_routes_mask_set_bit(chunk->routes_mask, route_path->ins->id, - ins->config); + ins->config->router); } - if (flb_routes_mask_is_empty(chunk->routes_mask, ins->config) == FLB_TRUE) { + if (flb_routes_mask_is_empty(chunk->routes_mask, ins->config->router) == FLB_TRUE) { return -1; } @@ -224,7 +226,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, if (flb_routes_mask_get_bit(chunk->routes_mask, route_path->ins->id, - ins->config) == 0) { + ins->config->router) == 0) { continue; } @@ -250,7 +252,7 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins, if (flb_routes_mask_get_bit(chunk->routes_mask, route_path->ins->id, - ins->config) == 0) { + ins->config->router) == 0) { continue; } @@ -813,13 +815,13 @@ static void input_chunk_remove_conditional_routes(struct flb_input_instance *ins if (flb_routes_mask_get_bit(chunk->routes_mask, route_path->ins->id, - ins->config) == 0) { + ins->config->router) == 0) { continue; } flb_routes_mask_clear_bit(chunk->routes_mask, route_path->ins->id, - ins->config); + ins->config->router); if (route_path->ins->total_limit_size == -1 || chunk->fs_counted == FLB_FALSE) { diff --git a/src/flb_metrics_exporter.c b/src/flb_metrics_exporter.c index 9834ab96cb5..609c23ae433 100644 --- a/src/flb_metrics_exporter.c +++ b/src/flb_metrics_exporter.c @@ -34,6 +34,7 @@ #include #include #include +#include static int collect_inputs(msgpack_sbuffer *mp_sbuf, msgpack_packer *mp_pck, struct flb_config *ctx) @@ -308,6 +309,15 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx) } } + if (ctx->router && ctx->router->cmt) { + ret = cmt_cat(cmt, ctx->router->cmt); + if (ret == -1) { + flb_error("[metrics exporter] could not append routing metrics"); + cmt_destroy(cmt); + return NULL; + } + } + /* Pipeline metrics: input, filters, outputs */ mk_list_foreach(head, &ctx->inputs) { i = mk_list_entry(head, struct flb_input_instance, _head); diff --git a/src/flb_router.c b/src/flb_router.c index 47251d52023..5b9c0813853 100644 --- a/src/flb_router.c +++ b/src/flb_router.c @@ -25,6 +25,7 @@ #include #include #include +#include #ifdef FLB_HAVE_REGEX #include @@ -298,3 +299,99 @@ void flb_router_exit(struct flb_config *config) } } } + +static int router_metrics_create(struct flb_router *router) +{ + if (!router || !router->cmt) { + return -1; + } + + router->logs_records_total = cmt_counter_create(router->cmt, + "fluentbit", + "routing_logs", + "records_total", + "Total log records routed from input to output", + 2, + (char *[]) {"input", "output"}); + if (!router->logs_records_total) { + return -1; + } + + router->logs_bytes_total = cmt_counter_create(router->cmt, + "fluentbit", + "routing_logs", + "bytes_total", + "Total bytes routed from input to output (logs)", + 2, + (char *[]) {"input", "output"}); + if (!router->logs_bytes_total) { + return -1; + } + + router->logs_drop_records_total = cmt_counter_create(router->cmt, + "fluentbit", + "routing_logs", + "drop_records_total", + "Total log records dropped during routing", + 2, + (char *[]) {"input", "output"}); + if (!router->logs_drop_records_total) { + return -1; + } + + router->logs_drop_bytes_total = cmt_counter_create(router->cmt, + "fluentbit", + "routing_logs", + "drop_bytes_total", + "Total bytes dropped during routing (logs)", + 2, + (char *[]) {"input", "output"}); + if (!router->logs_drop_bytes_total) { + return -1; + } + + return 0; +} + +struct flb_router *flb_router_create(struct flb_config *config) +{ + int ret; + struct flb_router *router; + (void) config; + + router = flb_calloc(1, sizeof(struct flb_router)); + if (!router) { + flb_errno(); + return NULL; + } + + router->cmt = cmt_create(); + if (!router->cmt) { + flb_free(router); + return NULL; + } + + ret = router_metrics_create(router); + if (ret != 0) { + flb_error("[router] failed to create metrics"); + flb_router_destroy(router); + return NULL; + } + + return router; +} + +void flb_router_destroy(struct flb_router *router) +{ + if (!router) { + return; + } + + flb_routes_empty_mask_destroy(router); + + if (router->cmt) { + cmt_destroy(router->cmt); + } + + flb_free(router); +} diff --git a/src/flb_routes_mask.c b/src/flb_routes_mask.c index 4ff295871a3..42db8999d47 100644 --- a/src/flb_routes_mask.c +++ b/src/flb_routes_mask.c @@ -24,6 +24,24 @@ #include +size_t flb_routes_mask_get_size(struct flb_router *router) +{ + if (router == NULL) { + return 0; + } + + return router->route_mask_size; +} + +size_t flb_routes_mask_get_slots(struct flb_router *router) +{ + if (router == NULL) { + return 0; + } + + return router->route_mask_slots; +} + /* * Set the routes_mask for input chunk with a router_match on tag, return a * non-zero value if any routes matched @@ -34,17 +52,20 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, struct flb_input_instance *in) { int has_routes = 0; + size_t size; struct mk_list *o_head; struct flb_output_instance *o_ins; if (!in) { return 0; } + if (in->config == NULL || in->config->router == NULL) { + return 0; + } + /* Clear the bit field */ - memset(routes_mask, - 0, - sizeof(flb_route_mask_element) * - in->config->route_mask_size); + size = flb_routes_mask_get_size(in->config->router); + memset(routes_mask, 0, sizeof(flb_route_mask_element) * size); /* Find all matching routes for the given tag */ mk_list_foreach(o_head, &in->config->outputs) { @@ -58,7 +79,7 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, , NULL #endif )) { - flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config); + flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config->router); has_routes = 1; } } @@ -74,13 +95,17 @@ int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask, * */ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { - flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", + if (router == NULL) { + return; + } + + if (value < 0 || value >= router->route_mask_slots) { + flb_warn("[routes_mask] Can't clear bit (%d) past limits of bitfield", value); return; } @@ -98,12 +123,16 @@ void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value, * */ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (router == NULL) { + return; + } + + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't set bit (%d) past limits of bitfield", value); return; @@ -123,12 +152,16 @@ void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value, * */ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, - struct flb_config *config) + struct flb_router *router) { int index; uint64_t bit; - if (value < 0 || value >= config->route_mask_slots) { + if (router == NULL) { + return 0; + } + + if (value < 0 || value >= router->route_mask_slots) { flb_warn("[routes_mask] Can't get bit (%d) past limits of bitfield", value); return 0; @@ -140,47 +173,63 @@ int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value, } int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask, - struct flb_config *config) + struct flb_router *router) { + if (router == NULL || router->route_empty_mask == NULL) { + return 0; + } + return memcmp(routes_mask, - config->route_empty_mask, - config->route_mask_size * sizeof(flb_route_mask_element)) == 0; + router->route_empty_mask, + router->route_mask_size * sizeof(flb_route_mask_element)) == 0; } -int flb_routes_empty_mask_create(struct flb_config *config) +int flb_routes_empty_mask_create(struct flb_router *router) { - flb_routes_empty_mask_destroy(config); + if (router == NULL) { + return -1; + } + + flb_routes_empty_mask_destroy(router); - config->route_empty_mask = flb_calloc(config->route_mask_size, + router->route_empty_mask = flb_calloc(router->route_mask_size, sizeof(flb_route_mask_element)); - if (config->route_empty_mask == NULL) { + if (router->route_empty_mask == NULL) { return -1; } return 0; } -void flb_routes_empty_mask_destroy(struct flb_config *config) +void flb_routes_empty_mask_destroy(struct flb_router *router) { - if (config->route_empty_mask != NULL) { - flb_free(config->route_empty_mask); + if (router == NULL) { + return; + } - config->route_empty_mask = NULL; + if (router->route_empty_mask != NULL) { + flb_free(router->route_empty_mask); + + router->route_empty_mask = NULL; } } -int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config) +int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router) { + if (router == NULL) { + return -1; + } + if (mask_size < 1) { mask_size = 1; } - mask_size = (mask_size / FLB_ROUTES_MASK_ELEMENT_BITS) + - (mask_size % FLB_ROUTES_MASK_ELEMENT_BITS); + mask_size = (mask_size + FLB_ROUTES_MASK_ELEMENT_BITS - 1) / + FLB_ROUTES_MASK_ELEMENT_BITS; - config->route_mask_size = mask_size; - config->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; + router->route_mask_size = mask_size; + router->route_mask_slots = mask_size * FLB_ROUTES_MASK_ELEMENT_BITS; - return flb_routes_empty_mask_create(config); + return flb_routes_empty_mask_create(router); } diff --git a/src/flb_task.c b/src/flb_task.c index f38540e32f5..3e3db5ece73 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -734,7 +734,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, if (task_ic->routes_mask) { if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id, - o_ins->config) == 0) { + o_ins->config->router) == 0) { continue; } } @@ -790,7 +790,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id, - o_ins->config) != 0) { + o_ins->config->router) != 0) { route = flb_calloc(1, sizeof(struct flb_task_route)); if (!route) { flb_errno(); diff --git a/tests/internal/conditional_routing.c b/tests/internal/conditional_routing.c index 430580e6bcf..38c4c19cf2a 100644 --- a/tests/internal/conditional_routing.c +++ b/tests/internal/conditional_routing.c @@ -315,7 +315,7 @@ void test_conditional_routing_per_record() /* Cleanup */ flb_router_exit(&config); - cleanup_conditional_routing_instances(&input, &output1, &output2, &output3, + cleanup_conditional_routing_instances(&config, &input, &output1, &output2, &output3, &input_routes, &route1, &route2, &route3, &route_output1, &route_output2, &route_output3); } @@ -374,7 +374,7 @@ void test_conditional_routing_default_route() /* Cleanup */ flb_router_exit(&config); - cleanup_conditional_routing_instances(&input, &output1, &output2, &output3, + cleanup_conditional_routing_instances(&config, &input, &output1, &output2, &output3, &input_routes, &route1, &route2, &route3, &route_output1, &route_output2, &route_output3); } @@ -414,17 +414,17 @@ void test_conditional_routing_route_mask() if (strcmp(record->level, "info") == 0) { /* Create a test chunk */ - chunk = flb_input_chunk_create(&input, "test_tag", 8, NULL, 0); + chunk = flb_input_chunk_create(&input, FLB_INPUT_LOGS, "test_tag", 8); TEST_CHECK(chunk != NULL); if (chunk) { /* Set route mask for info output only */ routes_mask = chunk->routes_mask; - flb_routes_mask_set_bit(routes_mask, output1.id, &config); + flb_routes_mask_set_bit(routes_mask, output1.id, config.router); /* Verify route mask is set correctly */ - TEST_CHECK(flb_routes_mask_get_bit(routes_mask, output1.id, &config) == 1); - TEST_CHECK(flb_routes_mask_get_bit(routes_mask, output2.id, &config) == 0); - TEST_CHECK(flb_routes_mask_get_bit(routes_mask, output3.id, &config) == 0); + TEST_CHECK(flb_routes_mask_get_bit(routes_mask, output1.id, config.router) == 1); + TEST_CHECK(flb_routes_mask_get_bit(routes_mask, output2.id, config.router) == 0); + TEST_CHECK(flb_routes_mask_get_bit(routes_mask, output3.id, config.router) == 0); flb_input_chunk_destroy(chunk); } @@ -433,7 +433,7 @@ void test_conditional_routing_route_mask() /* Cleanup */ flb_router_exit(&config); - cleanup_conditional_routing_instances(&input, &output1, &output2, &output3, + cleanup_conditional_routing_instances(&config, &input, &output1, &output2, &output3, &input_routes, &route1, &route2, &route3, &route_output1, &route_output2, &route_output3); } @@ -503,7 +503,7 @@ void test_conditional_routing_no_duplicates() /* Cleanup */ flb_router_exit(&config); - cleanup_conditional_routing_instances(&input, &output1, &output2, &output3, + cleanup_conditional_routing_instances(&config, &input, &output1, &output2, &output3, &input_routes, &route1, &route2, &route3, &route_output1, &route_output2, &route_output3); } @@ -814,6 +814,12 @@ static void setup_conditional_routing_instances(struct flb_config *config, mk_list_init(&config->outputs); cfl_list_init(&config->input_routes); + config->router = flb_router_create(config); + TEST_CHECK(config->router != NULL); + if (config->router) { + flb_routes_mask_set_size(1, config->router); + } + memset(input, 0, sizeof(struct flb_input_instance)); mk_list_init(&input->_head); cfl_list_init(&input->routes_direct); @@ -947,7 +953,8 @@ static int test_record_routing(struct flb_input_instance *input, return found; } -static void cleanup_conditional_routing_instances(struct flb_input_instance *input, +static void cleanup_conditional_routing_instances(struct flb_config *config, + struct flb_input_instance *input, struct flb_output_instance *output1, struct flb_output_instance *output2, struct flb_output_instance *output3, @@ -970,6 +977,11 @@ static void cleanup_conditional_routing_instances(struct flb_input_instance *inp flb_sds_destroy(route_output1->name); flb_sds_destroy(route_output2->name); flb_sds_destroy(route_output3->name); + + if (config && config->router) { + flb_router_destroy(config->router); + config->router = NULL; + } } TEST_LIST = { diff --git a/tests/internal/fuzzers/aws_credentials_fuzzer.c b/tests/internal/fuzzers/aws_credentials_fuzzer.c index d6ccb22e981..51df0a3ea96 100644 --- a/tests/internal/fuzzers/aws_credentials_fuzzer.c +++ b/tests/internal/fuzzers/aws_credentials_fuzzer.c @@ -18,6 +18,7 @@ */ #include +#include #include #include #include @@ -57,7 +58,7 @@ void fuzz_sts(const uint8_t *data, size_t size) { flb_sds_t s1 = flb_sts_uri(action, role_arn, session_name, external_id, identity_token); if (s1 != NULL) { - flb_sds_destroy(s1); + flb_sds_destroy(s1); } flb_free(action); @@ -76,13 +77,17 @@ void fuzz_sts(const uint8_t *data, size_t size) { void fuzz_http(const uint8_t *data, size_t size) { time_t expiration; struct flb_aws_credentials *creds = NULL; - - char *response = get_null_terminated(250, &data, &size); - creds = flb_parse_http_credentials(response, 250, &expiration); - if (creds != NULL) { - flb_aws_credentials_destroy(creds); + size_t response_len; + + response_len = (size > 250) ? 250 : size; + char *response = get_null_terminated(response_len, &data, &size); + if (response != NULL) { + creds = flb_parse_http_credentials(response, strlen(response), &expiration); + if (creds != NULL) { + flb_aws_credentials_destroy(creds); + } + flb_free(response); } - flb_free(response); } diff --git a/tests/internal/input_chunk_routes.c b/tests/internal/input_chunk_routes.c index 49ad951726a..489e04a0bf1 100644 --- a/tests/internal/input_chunk_routes.c +++ b/tests/internal/input_chunk_routes.c @@ -1,3 +1,7 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include "flb_tests_internal.h" + #include #include #include @@ -5,8 +9,17 @@ #include #include #include + +/* Dummy workaround: undefine input plugin macros to avoid redefinition warnings */ +#undef flb_plg_error +#undef flb_plg_warn +#undef flb_plg_info +#undef flb_plg_debug +#undef flb_plg_trace + #include #include +#include #include #include #include @@ -16,7 +29,6 @@ #include #include -#include "flb_tests_internal.h" #define TEST_STREAM_PATH "/tmp/flb-chunk-direct-test" #define TEST_STREAM_PATH_MATCH "/tmp/flb-chunk-direct-test-match" @@ -65,8 +77,18 @@ static int init_test_config(struct flb_config *config, return -1; } - ret = flb_routes_mask_set_size(64, config); + /* Create router context */ + config->router = flb_router_create(config); + if (config->router == NULL) { + flb_env_destroy(config->env); + config->env = NULL; + return -1; + } + + ret = flb_routes_mask_set_size(64, config->router); if (ret != 0) { + flb_router_destroy(config->router); + config->router = NULL; flb_env_destroy(config->env); config->env = NULL; return -1; @@ -241,7 +263,10 @@ static void cleanup_test_routing_scenario(struct flb_input_chunk *ic, in->net_config_map = NULL; } - flb_routes_empty_mask_destroy(config); + if (config->router) { + flb_router_destroy(config->router); + config->router = NULL; + } if (config->env) { flb_env_destroy(config->env); config->env = NULL; @@ -601,13 +626,13 @@ static void test_chunk_restore_alias_plugin_match_multiple() TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, stdout_one.id, - &config) == 1); + config.router) == 1); TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, stdout_two.id, - &config) == 1); + config.router) == 1); TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, http_out.id, - &config) == 0); + config.router) == 0); cleanup: cleanup_test_routing_scenario(ic, &stdout_one, &stdout_two, &http_out, @@ -762,13 +787,13 @@ static void test_chunk_restore_alias_plugin_null_matches_all() TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, stdout_one.id, - &config) == 1); + config.router) == 1); TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, stdout_two.id, - &config) == 1); + config.router) == 1); TEST_CHECK(flb_routes_mask_get_bit(ic->routes_mask, http_out.id, - &config) == 1); + config.router) == 1); cleanup: cleanup_test_routing_scenario(ic, &stdout_one, &stdout_two, &http_out,