Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

#include <monkey/mk_core.h>

struct flb_router;

#define FLB_CONFIG_FLUSH_SECS 1
#define FLB_CONFIG_HTTP_LISTEN "0.0.0.0"
#define FLB_CONFIG_HTTP_PORT "2020"
Expand Down Expand Up @@ -293,9 +295,7 @@ struct flb_config {
int hot_reload_watchdog_timeout_seconds;

/* Routing */
size_t route_mask_size;
size_t route_mask_slots;
uint64_t *route_empty_mask;
struct flb_router *router;
#ifdef FLB_SYSTEM_WINDOWS
/* maxstdio (Windows) */
int win_maxstdio;
Expand Down
22 changes: 22 additions & 0 deletions include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_conditionals.h>
#include <cfl/cfl.h>
#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_counter.h>
#include <monkey/mk_core.h>

struct flb_mp_chunk_cobj;
Expand All @@ -47,6 +49,22 @@ struct flb_router_path {
struct cfl_list _head;
};

struct flb_router {
/* Routing masks */
size_t route_mask_size;
size_t route_mask_slots;
uint64_t *route_empty_mask;

/* metrics context */
struct cmt *cmt;

/* logs routing metrics */
struct cmt_counter *logs_records_total;
struct cmt_counter *logs_bytes_total;
struct cmt_counter *logs_drop_records_total;
struct cmt_counter *logs_drop_bytes_total;
};

static inline int flb_router_match_type(int in_event_type,
struct flb_output_instance *o_ins)
{
Expand Down Expand Up @@ -183,5 +201,9 @@ int flb_router_config_parse(struct flb_cf *cf,
void flb_router_routes_destroy(struct cfl_list *input_routes);
int flb_router_apply_config(struct flb_config *config);

int flb_router_metrics_create(struct flb_config *config, struct flb_router *router);
struct flb_router *flb_router_create(struct flb_config *config);
void flb_router_destroy(struct flb_router *router);

#endif

25 changes: 14 additions & 11 deletions include/fluent-bit/flb_routes_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,26 @@ typedef uint64_t flb_route_mask_element;
/* forward declaration */
struct flb_input_instance;
struct flb_config;
struct flb_router;

int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask,
const char *tag,
int tag_len,
struct flb_input_instance *in);
int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value,
struct flb_config *config);
int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask,
struct flb_config *config);
int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value,
struct flb_router *router);
void flb_routes_mask_set_bit(flb_route_mask_element *routes_mask, int value,
struct flb_router *router);
void flb_routes_mask_clear_bit(flb_route_mask_element *routes_mask, int value,
struct flb_router *router);
int flb_routes_mask_is_empty(flb_route_mask_element *routes_mask,
struct flb_router *router);

int flb_routes_empty_mask_create(struct flb_config *config);
void flb_routes_empty_mask_destroy(struct flb_config *config);
int flb_routes_empty_mask_create(struct flb_router *router);
void flb_routes_empty_mask_destroy(struct flb_router *router);

int flb_routes_mask_set_size(size_t mask_size, struct flb_config *config);
int flb_routes_mask_set_size(size_t mask_size, struct flb_router *router);
size_t flb_routes_mask_get_size(struct flb_router *router);
size_t flb_routes_mask_get_slots(struct flb_router *router);

#endif
10 changes: 7 additions & 3 deletions plugins/in_storage_backlog/sb.c
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,14 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
int tag_len;
const char * tag_buf;
int result;
size_t slots;

memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));

slots = flb_routes_mask_get_slots(context->ins->config->router);
memset(context->dummy_routes_mask,
0,
context->ins->config->route_mask_slots * sizeof(flb_route_mask_element));
slots * sizeof(flb_route_mask_element));

Comment on lines +284 to 292
Copy link

@coderabbitai coderabbitai bot Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Over-allocation: using slots (bits) instead of size (elements) for mask operations.

flb_routes_mask_get_slots() returns bit-count; you’re allocating/clearing 64× more than needed. Use flb_routes_mask_get_size() for element count.

Apply this fix:

-    size_t                  slots;
+    size_t                  mask_size;
@@
-    slots = flb_routes_mask_get_slots(context->ins->config->router);
-    memset(context->dummy_routes_mask,
-           0,
-           slots * sizeof(flb_route_mask_element));
+    mask_size = flb_routes_mask_get_size(context->ins->config->router);
+    memset(context->dummy_routes_mask, 0,
+           mask_size * sizeof(flb_route_mask_element));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
size_t slots;
memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));
slots = flb_routes_mask_get_slots(context->ins->config->router);
memset(context->dummy_routes_mask,
0,
context->ins->config->route_mask_slots * sizeof(flb_route_mask_element));
slots * sizeof(flb_route_mask_element));
size_t mask_size;
memset(&dummy_input_chunk, 0, sizeof(struct flb_input_chunk));
mask_size = flb_routes_mask_get_size(context->ins->config->router);
memset(context->dummy_routes_mask, 0,
mask_size * sizeof(flb_route_mask_element));
🤖 Prompt for AI Agents
In plugins/in_storage_backlog/sb.c around lines 284 to 292, the code uses
flb_routes_mask_get_slots() (bit count) to compute the number of elements for
allocation/clearing, causing a 64x over-allocation; replace that call with
flb_routes_mask_get_size() to get the actual element count and use it when
computing the byte length for memset (elements * sizeof(flb_route_mask_element))
so the mask is cleared/allocated with the correct size.

Copy link
Contributor

@cosmo0920 cosmo0920 Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it could be an optimized way. But I have a question. If we use this way to allocate heap memories, it'll need to reallocate to fix the size of heap memories for route masks? This is because it could be using a possibly bigger size of allocation here, but the re-allocate costs could be high or any other re-allocating places are implemented to extend/shrink the size of heap memories that are using for route masks?
If there's no code path to re-allocate/adjust places for them, it could be worthless suggestion.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# First, let's examine the implementation of flb_routes_mask_get_slots() and flb_routes_mask_get_size()
echo "=== Implementation of flb_routes_mask_get_size() and flb_routes_mask_get_slots() ==="
ast-grep --pattern 'flb_routes_mask_get_size($$$) {
  $$$
}'

ast-grep --pattern 'flb_routes_mask_get_slots($$$) {
  $$$
}'

Length of output: 84


🏁 Script executed:

#!/bin/bash

# Let's look at the source file for these functions
echo "=== Contents of src/flb_routes_mask.c for these functions ==="
rg -A 10 "^size_t flb_routes_mask_get_(size|slots)" src/flb_routes_mask.c

Length of output: 444


🏁 Script executed:

#!/bin/bash

# Let's see how dummy_routes_mask is allocated in cb_sb_init
echo "=== Allocation of dummy_routes_mask in cb_sb_init ==="
rg -B 2 -A 5 "ctx->dummy_routes_mask = flb_calloc" plugins/in_storage_backlog/sb.c

Length of output: 316


🏁 Script executed:

#!/bin/bash

# Let's check other places in the codebase where routes_mask is allocated or sized
echo "=== Other allocations using flb_routes_mask_get_size ==="
rg -n "flb_routes_mask_get_size.*config->router" --type c -C 2

echo ""
echo "=== Other allocations using flb_routes_mask_get_slots ==="
rg -n "flb_routes_mask_get_slots.*config->router" --type c -C 2

Length of output: 2028


🏁 Script executed:

#!/bin/bash

# Let's examine the struct definition to understand what route_mask_slots and route_mask_size mean
echo "=== Router struct definition ==="
rg -A 15 "^struct flb_router" include/fluent-bit/flb_router.h

Length of output: 856


@cosmo0920, great question! Let me clarify - there's no reallocation happening here, and this is actually a bug fix, not just an optimization.

The Issue

The semantic difference between the two functions:

  • route_mask_slots = number of bits (i.e., max number of outputs/routes)
  • route_mask_size = number of elements (uint64_t) = ceil(slots / 64)

Since each flb_route_mask_element (uint64_t) holds 64 bits, if you have 3 outputs, you need:

  • route_mask_slots = 3 (bits)
  • route_mask_size = 1 (element, since 3 bits fit in one uint64_t)

Current Bug

The storage_backlog code currently:

  • Allocates slots elements (3 elements = 192 bytes) when it only needs size elements (1 element = 8 bytes)
  • Clears slots elements in the loop when it only needs to clear size elements

This is a ~64× over-allocation and over-clearing, not a design choice.

No Reallocation Occurs

The lifecycle is simple:

  1. Allocate once in cb_sb_init (line 671)
  2. Clear and reuse in sb_append_chunk_to_segregated_backlogs (line 289) — called in a loop
  3. Free once in cb_sb_exit

The mask size is determined at router initialization and never changes. There's no code path that reallocates or adjusts this buffer.

Consistency with Rest of Codebase

Every other allocation/clear operation in the refactored code uses get_size():

  • src/flb_input_chunk.c line 964 & 2024: allocations use get_size()
  • src/flb_input_log.c line 182: memset uses get_size()
  • src/flb_routes_mask.c line 67: memset uses get_size()

The storage_backlog plugin is the only place still using get_slots() incorrectly.

Recommendation

Apply the fix to match the rest of the codebase. It will:

  • ✅ Fix the over-allocation bug
  • ✅ Reduce repeated memset overhead in the loop
  • ✅ Make the code consistent with the router refactoring pattern
  • ✅ No reallocation costs (because there's no reallocation)

🧬 Code Graph Analysis Results

[src/flb_routes_mask.c] lines 36-43

size_t flb_routes_mask_get_slots(struct flb_router *router)
{
    if (router == NULL) {
        return 0;
    }

    return router->route_mask_slots;
}

[src/flb_routes_mask.c] lines 49-88

int flb_routes_mask_set_by_tag(flb_route_mask_element *routes_mask,
                               const char *tag,
                               int tag_len,
                               struct flb_input_instance *in)
{
    int has_routes = 0;
    size_t size;
    struct mk_list *o_head;
    struct flb_output_instance *o_ins;
    if (!in) {
        return 0;
    }

    if (in->config == NULL || in->config->router == NULL) {
        return 0;
    }

    /* Clear the bit field */
    size = flb_routes_mask_get_size(in->config->router);
    memset(routes_mask, 0, sizeof(flb_route_mask_element) * size);

    /* Find all matching routes for the given tag */
    mk_list_foreach(o_head, &in->config->outputs) {
        o_ins = mk_list_entry(o_head,
                              struct flb_output_instance, _head);

        if (flb_router_match(tag, tag_len, o_ins->match
#ifdef FLB_HAVE_REGEX
                             , o_ins->match_regex
#else
                             , NULL
#endif
                             )) {
            flb_routes_mask_set_bit(routes_mask, o_ins->id, o_ins->config->router);
            has_routes = 1;
        }
    }

    return has_routes;
}

[src/flb_routes_mask.c] lines 154-173

int flb_routes_mask_get_bit(flb_route_mask_element *routes_mask, int value,
                            struct flb_router *router)
{
    int index;
    uint64_t bit;

    if (router == NULL) {
        return 0;
    }

    if (value < 0 || value >= router->route_mask_slots) {
        flb_warn("[routes_mask] Can't get bit (%d) past limits of bitfield",
                 value);
        return 0;
    }

    index = value / FLB_ROUTES_MASK_ELEMENT_BITS;
    bit = 1ULL << (value % FLB_ROUTES_MASK_ELEMENT_BITS);
    return (routes_mask[index] & bit) != 0ULL;
}

[src/include/fluent-bit/flb_mem.h] lines 84-96

void *flb_calloc(size_t n, const size_t size) {
    if (size == 0) {
        return NULL;
    }
#ifdef FLB_HAVE_TESTS_OSSFUZZ
   // Add chance of failure. Used by fuzzing to test error-handling code.
   if (flb_fuzz_get_probability(1)) {
     return NULL;
   }
#endif

    return calloc(n, size);
}

[src/include/fluent-bit/flb_mem.h] lines 126-128

static inline void flb_free(void *ptr) {
    free(ptr);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edsiper This @coderabbitai's point of view could be correct for this patch. So, could you take a look on it?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosmo0920, thank you for the review! I appreciate you confirming the analysis.

@edsiper, the issue is that both the allocation (line 671) and memset (line 289) in the storage_backlog plugin are using flb_routes_mask_get_slots() where they should use flb_routes_mask_get_size(). This causes a 64× over-allocation since slots counts bits while size counts uint64_t elements.

The fix is straightforward and makes the code consistent with the rest of the router refactoring. Happy to clarify anything if needed!

dummy_input_chunk.in = context->ins;
dummy_input_chunk.chunk = target_chunk;
Expand Down Expand Up @@ -317,7 +319,7 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
backlog = mk_list_entry(head, struct sb_out_queue, _head);
if (flb_routes_mask_get_bit(dummy_input_chunk.routes_mask,
backlog->ins->id,
backlog->ins->config)) {
backlog->ins->config->router)) {
result = sb_append_chunk_to_segregated_backlog(target_chunk, stream,
chunk_size, backlog);
if (result) {
Expand Down Expand Up @@ -656,6 +658,7 @@ static int cb_sb_init(struct flb_input_instance *in,
int ret;
char mem[32];
struct flb_sb *ctx;
size_t slots;

ctx = flb_calloc(1, sizeof(struct flb_sb));

Expand All @@ -664,7 +667,8 @@ static int cb_sb_init(struct flb_input_instance *in,
return -1;
}

ctx->dummy_routes_mask = flb_calloc(in->config->route_mask_slots,
slots = flb_routes_mask_get_slots(config->router);
ctx->dummy_routes_mask = flb_calloc(slots,
sizeof(flb_route_mask_element));

if (ctx->dummy_routes_mask == NULL) {
Expand Down
34 changes: 32 additions & 2 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,36 @@ struct flb_config *flb_config_init()
}

/* Routing */
flb_routes_mask_set_size(1, config);
config->router = flb_router_create(config);
if (!config->router) {
flb_error("[config] could not create router");
if (config->kernel) {
flb_kernel_destroy(config->kernel);
}
#ifdef FLB_HAVE_HTTP_SERVER
if (config->http_listen) {
flb_free(config->http_listen);
}

if (config->http_port) {
flb_free(config->http_port);
}
#endif
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
ret = flb_routes_mask_set_size(1, config->router);
if (ret != 0) {
flb_error("[config] routing mask dimensioning failed");
flb_router_destroy(config->router);
if (config->kernel) {
flb_kernel_destroy(config->kernel);
}
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
Comment on lines +326 to +336
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Previous review concern addressed, but memory leak remains.

The return value of flb_routes_mask_set_size() is now properly checked and the router is correctly destroyed on failure, addressing the past review comment. However, the same memory leak for http_listen and http_port exists in this error path.

Apply this diff to fix the leak:

     ret = flb_routes_mask_set_size(1, config->router);
     if (ret != 0) {
         flb_error("[config] routing mask dimensioning failed");
         flb_router_destroy(config->router);
+#ifdef FLB_HAVE_HTTP_SERVER
+        if (config->http_listen) {
+            flb_free(config->http_listen);
+        }
+        if (config->http_port) {
+            flb_free(config->http_port);
+        }
+#endif
         if (config->kernel) {
             flb_kernel_destroy(config->kernel);
         }
         flb_cf_destroy(cf);
         flb_free(config);
         return NULL;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ret = flb_routes_mask_set_size(1, config->router);
if (ret != 0) {
flb_error("[config] routing mask dimensioning failed");
flb_router_destroy(config->router);
if (config->kernel) {
flb_kernel_destroy(config->kernel);
}
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
ret = flb_routes_mask_set_size(1, config->router);
if (ret != 0) {
flb_error("[config] routing mask dimensioning failed");
flb_router_destroy(config->router);
#ifdef FLB_HAVE_HTTP_SERVER
if (config->http_listen) {
flb_free(config->http_listen);
}
if (config->http_port) {
flb_free(config->http_port);
}
#endif
if (config->kernel) {
flb_kernel_destroy(config->kernel);
}
flb_cf_destroy(cf);
flb_free(config);
return NULL;
}
🤖 Prompt for AI Agents
In src/flb_config.c around lines 317 to 327, the error path after
flb_routes_mask_set_size() fails correctly destroys the router and kernel but
leaks memory held by config->http_listen and config->http_port; free those two
string members (checking for non-NULL if desired) before calling
flb_free(config) (i.e., flb_free(config->http_listen);
flb_free(config->http_port);) so all allocated config resources are released on
this failure path.


config->cio = NULL;
config->storage_path = NULL;
Expand Down Expand Up @@ -613,7 +642,8 @@ void flb_config_exit(struct flb_config *config)

/* release task map */
flb_config_task_map_resize(config, 0);
flb_routes_empty_mask_destroy(config);

flb_router_destroy(config->router);

/* Clean up router input routes */
flb_router_routes_destroy(&config->input_routes);
Expand Down
89 changes: 64 additions & 25 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_engine_dispatch.h>
#include <fluent-bit/flb_network.h>
#include <fluent-bit/flb_task.h>
Expand Down Expand Up @@ -243,7 +244,8 @@ static inline int handle_output_event(uint64_t ts,
uint32_t type;
uint32_t key;
double latency_seconds;
char *name;
char *in_name;
char *out_name;
struct flb_task *task;
struct flb_task_retry *retry;
struct flb_output_instance *ins;
Expand Down Expand Up @@ -289,7 +291,8 @@ static inline int handle_output_event(uint64_t ts,
if (flb_output_is_threaded(ins) == FLB_FALSE) {
flb_output_flush_finished(config, out_id);
}
name = (char *) flb_output_name(ins);
in_name = (char *) flb_input_name(task->i_ins);
out_name = (char *) flb_output_name(ins);

/* If we are in synchronous mode, flush the next waiting task */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
Expand All @@ -302,16 +305,26 @@ static inline int handle_output_event(uint64_t ts,
if (ret == FLB_OK) {
/* cmetrics */
cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events,
1, (char *[]) {name});
1, (char *[]) {out_name});

cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (config->router && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_records_total, ts,
task->event_chunk->total_events,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

/* latency histogram */
if (ins->cmt_latency) {
latency_seconds = flb_time_now() - ((struct flb_input_chunk *) task->ic)->create_time;
cmt_histogram_observe(ins->cmt_latency, ts, latency_seconds, 2,
(char *[]) {(char *) flb_input_name(task->i_ins), name});
(char *[]) {in_name, out_name});
}

/* [OLD API] Update metrics */
Expand Down Expand Up @@ -346,7 +359,7 @@ static inline int handle_output_event(uint64_t ts,

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

flb_task_retry_clean(task, ins);
flb_task_users_dec(task, FLB_TRUE);
Expand All @@ -355,11 +368,22 @@ static inline int handle_output_event(uint64_t ts,
if (ins->retry_limit == FLB_OUT_RETRY_NONE) {
/* cmetrics: output_dropped_records_total */
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (config->router && task->event_chunk &&
task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD metrics API */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -389,13 +413,24 @@ static inline int handle_output_event(uint64_t ts,
*/

/* cmetrics */
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {name});
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (config->router && task->event_chunk &&
task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD metrics API */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -449,13 +484,13 @@ static inline int handle_output_event(uint64_t ts,
flb_output_name(ins), out_id);

/* cmetrics */
cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {name});
cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_retried_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD metrics API: update the metrics since a new retry is coming */
#ifdef FLB_HAVE_METRICS
Expand All @@ -466,13 +501,24 @@ static inline int handle_output_event(uint64_t ts,
}
else if (ret == FLB_ERROR) {
/* cmetrics */
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {name});
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
1, (char *[]) {name});
1, (char *[]) {out_name});

if (config->router && task->event_chunk &&
task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
2, (char *[]) {in_name, out_name});
}

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
calculate_chunk_capacity_percent(ins),
1, (char *[]) {name});
1, (char *[]) {out_name});

/* OLD API */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -811,14 +857,7 @@ int flb_engine_start(struct flb_config *config)
config->notification_channels_initialized = FLB_TRUE;
config->notification_event.type = FLB_ENGINE_EV_NOTIFICATION;

ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config);

if (ret != 0) {
flb_error("[engine] routing mask dimensioning failed");
return -1;
}

ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config);
ret = flb_routes_mask_set_size(mk_list_size(&config->outputs), config->router);

if (ret != 0) {
flb_error("[engine] routing mask dimensioning failed");
Expand Down
Loading
Loading