Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c7f8426
task: persist route-effective records and bytes
edsiper Mar 3, 2026
8cbefb6
engine: use effective log records counter
edsiper Mar 3, 2026
4d2b608
out_forward: force non-log signals to forward mode
edsiper Mar 3, 2026
245961b
tests: runtime: out_forward non-log formatter coverage
edsiper Mar 3, 2026
6fb48aa
tests: runtime: add grouped log runtime counter semantics
edsiper Mar 3, 2026
7ab6556
tests: runtime: add e2e route counter parity coverage
edsiper Mar 3, 2026
373ec76
mp: add logical log record counting API
edsiper Mar 3, 2026
22bcc4f
input_log: use logical record counting for grouped logs
edsiper Mar 3, 2026
b6127f0
input_chunk: align grouped-log counters and route drop accounting
edsiper Mar 3, 2026
56372c4
filter: count grouped logs as logical records
edsiper Mar 3, 2026
2fd5360
processor: align grouped-log record accounting
edsiper Mar 3, 2026
dfd9221
output: use logical log record count in processed chunks
edsiper Mar 3, 2026
9c850bf
filter_alter_size: fix grouped-log counter usage
edsiper Mar 3, 2026
553289c
out_azure: align counters with logical log records
edsiper Mar 3, 2026
c5e4173
out_azure_kusto: align counters with logical log records
edsiper Mar 3, 2026
00cc77f
out_azure_logs_ingestion: use decoder-based grouped-log counting
edsiper Mar 3, 2026
72281d0
out_bigquery: align counters with logical log records
edsiper Mar 3, 2026
81ff0af
out_counter: report serialized events and logical log records
edsiper Mar 3, 2026
7d358e4
out_datadog: align formatter count fallback with logical logs
edsiper Mar 3, 2026
012620a
out_kafka_rest: align counters with logical log records
edsiper Mar 3, 2026
f6f0d29
out_logdna: align counters with logical log records
edsiper Mar 3, 2026
b4fd0ba
out_loki: align formatter count fallback with logical logs
edsiper Mar 3, 2026
ed2a224
out_nats: align counters with logical log records
edsiper Mar 3, 2026
e0e20be
out_nrlogs: align counters with logical log records
edsiper Mar 3, 2026
af5104e
out_oracle_log_analytics: align counters with logical logs
edsiper Mar 3, 2026
8e4263c
out_s3: align counters with logical log records
edsiper Mar 3, 2026
e9f14b9
out_skywalking: align counters with logical log records
edsiper Mar 3, 2026
47ff805
out_stackdriver: fix grouped-log counter parity
edsiper Mar 3, 2026
d460c10
tests: internal: input_chunk grouped-log counter regressions
edsiper Mar 3, 2026
4322eaf
tests: internal: input_chunk_routes grouped-log map counting regression
edsiper Mar 3, 2026
5e557ac
tests: internal: log_event_decoder grouped marker coverage
edsiper Mar 3, 2026
6ef2d76
tests: internal: processor grouped-log counter invariants
edsiper Mar 3, 2026
0826c46
tests: internal: task_map route data preservation across retries
edsiper Mar 3, 2026
1917e05
output: preserve non-log route records on return
edsiper Mar 4, 2026
e1a8822
input_chunk: preserve zero drops and use route drop bytes
edsiper Mar 4, 2026
bb18a89
out_counter: avoid total double counting
edsiper Mar 4, 2026
eb1214f
tests: internal: processor: harden grouped counter test path
edsiper Mar 4, 2026
c7f1335
out_counter: validate json output counters
edsiper Mar 10, 2026
1443e27
tests: internal: processor: harden grouped filter cleanup
edsiper Mar 10, 2026
d626641
mp: add grouped log counter helpers
edsiper Mar 10, 2026
dea1c89
output: expose grouped log counter values
edsiper Mar 10, 2026
dfdd683
processor: track grouped log counter semantics
edsiper Mar 10, 2026
ca59a46
tests: runtime: cover grouped log counter semantics
edsiper Mar 10, 2026
0f84fdd
tests: runtime: harden grouped counter parity setup
edsiper Mar 16, 2026
6f52e4c
tests: internal: processor: remove dead grouped cleanup branch
edsiper Mar 16, 2026
6313177
mp: fix grouped normalize loop iterator type
edsiper Mar 16, 2026
2c9eb39
mp: check grouped normalization result
edsiper Mar 18, 2026
7c07918
tests: internal: processor: avoid grouped buffer double free
edsiper Mar 18, 2026
270cd24
tests: runtime: harden grouped retry parity timing
edsiper Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/fluent-bit/flb_mp.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
#define FLB_MP_ARRAY MSGPACK_OBJECT_ARRAY

int flb_mp_count(const void *data, size_t bytes);
int flb_mp_count_log_records(const void *data, size_t bytes);
int flb_mp_normalize_log_buffer_groups_msgpack(const void *in_buf, size_t in_size,
char **out_buf, size_t *out_size);
int flb_mp_normalize_log_buffer_groups(const void *in_buf, size_t in_size,
char **out_buf, size_t *out_size);
int flb_mp_count_remaining(const void *data, size_t bytes, size_t *remaining_bytes);
int flb_mp_validate_log_chunk(const void *data, size_t bytes,
int *out_records, size_t *processed_bytes);
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_mp_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct flb_mp_chunk_cobj *flb_mp_chunk_cobj_create(struct flb_log_event_encoder
int flb_mp_chunk_cobj_destroy(struct flb_mp_chunk_cobj *chunk_cobj);

int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_buf, size_t *out_size);
int flb_mp_chunk_cobj_count_log_records(struct flb_mp_chunk_cobj *chunk_cobj);
int flb_mp_chunk_cobj_normalize_groups(struct flb_mp_chunk_cobj *chunk_cobj);



Expand Down
40 changes: 37 additions & 3 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <fluent-bit/flb_upstream_ha.h>
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_mp.h>

#include <cfl/cfl.h>
#include <cmetrics/cmetrics.h>
Expand Down Expand Up @@ -777,6 +778,9 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,

if (flb_processor_is_active(o_ins->processor)) {
if (evc->type == FLB_EVENT_TYPE_LOGS) {
char *normalized_buf;
size_t normalized_size;

/* run the processor */
ret = flb_processor_run(o_ins->processor,
0,
Expand All @@ -790,7 +794,22 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
return NULL;
}

records = flb_mp_count(p_buf, p_size);
normalized_buf = NULL;
normalized_size = 0;

ret = flb_mp_normalize_log_buffer_groups_msgpack(p_buf, p_size,
&normalized_buf,
&normalized_size);
if (ret == 0) {
if (p_buf != evc->data) {
flb_free(p_buf);
}

p_buf = normalized_buf;
p_size = normalized_size;
}

records = flb_mp_count_log_records(p_buf, p_size);
tmp = flb_event_chunk_create(evc->type, records, evc->tag, flb_sds_len(evc->tag), p_buf, p_size);
if (!tmp) {
flb_coro_destroy(coro);
Expand Down Expand Up @@ -1190,10 +1209,13 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
*/
static inline void flb_output_return(int ret, struct flb_coro *co) {
int n;
int records;
int pipe_fd;
uint32_t set;
uint64_t val;
size_t bytes;
struct flb_task *task;
struct flb_event_chunk *counted_event_chunk;
struct flb_output_flush *out_flush;
struct flb_output_instance *o_ins;
struct flb_out_thread_instance *th_ins = NULL;
Expand All @@ -1202,10 +1224,22 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
o_ins = out_flush->o_ins;
task = out_flush->task;

flb_task_acquire_lock(task);
if (out_flush->processed_event_chunk) {
counted_event_chunk = out_flush->processed_event_chunk;
}
else {
counted_event_chunk = task->event_chunk;
}

flb_task_deactivate_route(task, o_ins);
records = task->event_chunk->total_events;
if (counted_event_chunk->type == FLB_EVENT_TYPE_LOGS) {
records = counted_event_chunk->total_events;
}
bytes = counted_event_chunk->size;

flb_task_acquire_lock(task);
flb_task_set_route_data(task, o_ins, records, bytes);
flb_task_deactivate_route(task, o_ins);
flb_task_release_lock(task);

#ifdef FLB_HAVE_CHUNK_TRACE
Expand Down
13 changes: 13 additions & 0 deletions include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ struct flb_processor {

flb_pipefd_t notification_channel;

/*
* Processor chain accounting metrics
* ----------------------------------
* These counters are registered in the owner context metrics (input/output)
* and updated per processor unit execution.
*/
struct cmt_counter *cmt_invocations;
struct cmt_counter *cmt_errors;
struct cmt_counter *cmt_items_in;
struct cmt_counter *cmt_items_out;
struct cmt_counter *cmt_items_drop;
struct cmt_counter *cmt_items_add;

/* Fluent Bit context */
struct flb_config *config;
};
Expand Down
48 changes: 48 additions & 0 deletions include/fluent-bit/flb_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@

struct flb_task_route {
int status;
int records;
size_t bytes;
struct flb_output_instance *out;
struct mk_list _head;
};
Expand Down Expand Up @@ -257,6 +259,52 @@ static FLB_INLINE void flb_task_set_route_status(
}
}

static FLB_INLINE void flb_task_set_route_data(
struct flb_task *task,
struct flb_output_instance *o_ins,
int records,
size_t bytes)
{
struct mk_list *iterator;
struct flb_task_route *route;

mk_list_foreach(iterator, &task->routes) {
route = mk_list_entry(iterator, struct flb_task_route, _head);

if (route->out == o_ins) {
route->records = records;
route->bytes = bytes;
break;
}
}
}

static FLB_INLINE int flb_task_get_route_data(
struct flb_task *task,
struct flb_output_instance *o_ins,
int *records,
size_t *bytes)
{
struct mk_list *iterator;
struct flb_task_route *route;

if (records == NULL || bytes == NULL) {
return -1;
}

mk_list_foreach(iterator, &task->routes) {
route = mk_list_entry(iterator, struct flb_task_route, _head);

if (route->out == o_ins) {
*records = route->records;
*bytes = route->bytes;
return 0;
}
}

return -1;
}


static FLB_INLINE void flb_task_activate_route(
struct flb_task *task,
Expand Down
3 changes: 2 additions & 1 deletion plugins/filter_alter_size/alter_size.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fluent-bit/flb_filter_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

Expand Down Expand Up @@ -142,7 +143,7 @@ static int cb_alter_size_filter(const void *data, size_t bytes,
count = 0;

/* Count number of current items */
total = flb_mp_count(data, bytes);
total = flb_mp_count_log_records(data, bytes);
total -= ctx->remove;
if (total <= 0) {
/* zero records */
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_azure/azure.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static int azure_format(const void *in_buf, size_t in_bytes,
int ret;

/* Count number of items */
array_size = flb_mp_count(in_buf, in_bytes);
array_size = flb_mp_count_log_records(in_buf, in_bytes);

ret = flb_log_event_decoder_init(&log_decoder, (char *) in_buf, in_bytes);

Expand Down
2 changes: 1 addition & 1 deletion plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ static int azure_kusto_format(struct flb_azure_kusto *ctx, const char *tag, int
flb_sds_t out_buf;

/* Create array for all records */
records = flb_mp_count(data, bytes);
records = flb_mp_count_log_records(data, bytes);
if (records <= 0) {
flb_plg_error(ctx->ins, "error counting msgpack entries");
return -1;
Expand Down
31 changes: 17 additions & 14 deletions plugins/out_azure_logs_ingestion/azure_logs_ingestion.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_hmac.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <msgpack.h>

#include "azure_logs_ingestion.h"
Expand Down Expand Up @@ -58,14 +60,13 @@ static int az_li_format(const void *in_buf, size_t in_bytes,
struct flb_config *config)
{
int i;
int ret;
int array_size = 0;
int map_size;
size_t off = 0;
double t;
struct flb_time tm;
msgpack_unpacked result;
msgpack_object root;
msgpack_object *obj;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
msgpack_object map;
msgpack_object k;
msgpack_object v;
Expand All @@ -80,26 +81,28 @@ static int az_li_format(const void *in_buf, size_t in_bytes,
int len;

/* Count number of items */
array_size = flb_mp_count(in_buf, in_bytes);
msgpack_unpacked_init(&result);
array_size = flb_mp_count_log_records(in_buf, in_bytes);

/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
msgpack_pack_array(&mp_pck, array_size);

off = 0;
while (msgpack_unpack_next(&result, in_buf, in_bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
root = result.data;
ret = flb_log_event_decoder_init(&log_decoder, (char *) in_buf, in_bytes);
if (ret != FLB_EVENT_DECODER_SUCCESS) {
msgpack_sbuffer_destroy(&mp_sbuf);
return -1;
}

/* Get timestamp */
flb_time_pop_from_msgpack(&tm, &result, &obj);
while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) ==
FLB_EVENT_DECODER_SUCCESS) {
flb_time_copy(&tm, &log_event.timestamp);

/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

map = root.via.array.ptr[1];
map = *log_event.body;
map_size = map.via.map.size;

msgpack_pack_map(&mp_pck, map_size + 1);
Expand Down Expand Up @@ -147,12 +150,12 @@ static int az_li_format(const void *in_buf, size_t in_bytes,
if (!record) {
flb_errno();
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
flb_log_event_decoder_destroy(&log_decoder);
return -1;
}

msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_unpacked_destroy(&result);
flb_log_event_decoder_destroy(&log_decoder);

*out_buf = record;
*out_size = flb_sds_len(record);
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_bigquery/bigquery.c
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ static int bigquery_format(const void *data, size_t bytes,
return -1;
}

array_size = flb_mp_count(data, bytes);
array_size = flb_mp_count_log_records(data, bytes);

/* Create temporary msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
Expand Down
54 changes: 22 additions & 32 deletions plugins/out_counter/counter.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,13 @@
#include <sys/types.h>
#include <sys/stat.h>

struct flb_counter_ctx {
uint64_t total;
};

static int cb_counter_init(struct flb_output_instance *ins,
struct flb_config *config,
void *data)
{
(void) ins;
(void) config;
(void) data;
struct flb_counter_ctx *ctx;

ctx = flb_malloc(sizeof(struct flb_counter_ctx));
if (!ctx) {
flb_errno();
return -1;
}
ctx->total = 0;
flb_output_set_context(ins, ctx);
if (flb_output_config_map_set(ins, (void *)ctx) == -1) {
flb_plg_error(ins, "unable to load configuration");
flb_free(ctx);
return -1;
}

return 0;
}
Expand All @@ -61,32 +43,40 @@ static void cb_counter_flush(struct flb_event_chunk *event_chunk,
struct flb_config *config)
{
(void) i_ins;
(void) out_flush;
(void) out_context;
(void) config;
size_t cnt;
struct flb_counter_ctx *ctx = out_context;
size_t serialized_events;
size_t log_records;
size_t total;
struct flb_time tm;

/* Count number of parent items */
cnt = flb_mp_count(event_chunk->data, event_chunk->size);
ctx->total += cnt;
/* Count number of serialized msgpack root objects */
serialized_events = flb_mp_count(event_chunk->data, event_chunk->size);

/* Count number of logical log records (group markers excluded) */
log_records = 0;
if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
log_records = flb_mp_count_log_records(event_chunk->data,
event_chunk->size);
}
total = serialized_events;

flb_time_get(&tm);
printf("%f,%lu (total = %"PRIu64")\n", flb_time_to_double(&tm), cnt,
ctx->total);
printf("{\"ts\":%.6f,\"serialized_events\":%zu,\"log_records\":%zu,"
"\"total\":%zu}\n",
flb_time_to_double(&tm),
serialized_events,
log_records,
total);

FLB_OUTPUT_RETURN(FLB_OK);
}

static int cb_counter_exit(void *data, struct flb_config *config)
{
struct flb_counter_ctx *ctx = data;

if (!ctx) {
return 0;
}

flb_free(ctx);
(void) config;
(void) data;
return 0;
}

Expand Down
Loading
Loading