diff --git a/include/fluent-bit/flb_macros.h b/include/fluent-bit/flb_macros.h index b66fe063fa1..dcadcdb0e53 100644 --- a/include/fluent-bit/flb_macros.h +++ b/include/fluent-bit/flb_macros.h @@ -20,10 +20,11 @@ #ifndef FLB_MACROS_H #define FLB_MACROS_H +#include #include -#define FLB_FALSE 0 -#define FLB_TRUE !FLB_FALSE +#define FLB_FALSE false +#define FLB_TRUE true /* Return values */ #define FLB_ERROR 0 diff --git a/include/fluent-bit/flb_metrics.h b/include/fluent-bit/flb_metrics.h index b57a1d06bed..f94d06700b4 100644 --- a/include/fluent-bit/flb_metrics.h +++ b/include/fluent-bit/flb_metrics.h @@ -42,6 +42,13 @@ #include #include +/* + * v1 HTTP endpoint metrics + * ------------------------ + * These are functions that are not part of the CMetrics library, its' the old interface + * to ship internal metrics which is part of the v1 HTTP endpoint. + */ + /* Metrics IDs for general purpose (used by core and Plugins */ #define FLB_METRIC_N_RECORDS 0 #define FLB_METRIC_N_BYTES 1 @@ -85,6 +92,10 @@ int flb_metrics_dump_values(char **out_buf, size_t *out_size, struct flb_metrics *me); int flb_metrics_destroy(struct flb_metrics *metrics); int flb_metrics_fluentbit_add(struct flb_config *ctx, struct cmt *cmt); +/* ! end of v1 HTTP endpoint metrics */ + +/* General metrics utilities */ +bool flb_metrics_is_empty(struct cmt *cmt); #endif #endif /* FLB_HAVE_METRICS */ diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 4a28a728813..d9675e48bc4 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -49,6 +49,7 @@ #include #include +#include #include #include #include @@ -669,6 +670,13 @@ static FLB_INLINE void output_pre_cb_flush(void) flb_debug("[output] skipping flush for event chunk with zero records."); FLB_OUTPUT_RETURN(FLB_OK); } + /* Skip flush if processed event chunk has no data (empty after processing) */ + else if (persisted_params.event_chunk && + persisted_params.event_chunk->type == FLB_EVENT_TYPE_METRICS && + persisted_params.event_chunk->size == 0) { + flb_debug("[output] skipping flush for event chunk with no data after processing."); + FLB_OUTPUT_RETURN(FLB_OK); + } /* Continue, we will resume later */ out_p = persisted_params.out_plugin; @@ -708,7 +716,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, { int ret; size_t records; - void *p_buf; + void *p_buf = NULL; size_t p_size; size_t stack_size; struct flb_coro *coro; @@ -725,8 +733,10 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, struct ctrace *trace_context; struct cprof *profile_context; size_t chunk_offset; + struct cmt *encode_context = NULL; struct cmt *cmt_out_context = NULL; + /* Custom output coroutine info */ out_flush = (struct flb_output_flush *) flb_calloc(1, sizeof(struct flb_output_flush)); if (!out_flush) { @@ -756,6 +766,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, /* Logs processor */ evc = task->event_chunk; + if (flb_processor_is_active(o_ins->processor)) { if (evc->type == FLB_EVENT_TYPE_LOGS) { /* run the processor */ @@ -786,10 +797,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, if (p_buf == NULL) { flb_errno(); - flb_coro_destroy(coro); flb_free(out_flush); - return NULL; } @@ -803,6 +812,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, (char *) evc->data, evc->size, &chunk_offset)) == CMT_DECODE_MSGPACK_SUCCESS) { + + cmt_out_context = NULL; ret = flb_processor_run(o_ins->processor, 0, FLB_PROCESSOR_METRICS, @@ -814,6 +825,22 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, NULL); if (ret == 0) { + if (cmt_out_context) { + encode_context = cmt_out_context; + } + else { + encode_context = metrics_context; + } + + /* if the cmetrics context lacks time series just skip it */ + if (flb_metrics_is_empty(encode_context)) { + if (encode_context != metrics_context) { + cmt_destroy(encode_context); + } + cmt_destroy(metrics_context); + continue; + } + if (cmt_out_context != NULL) { ret = cmt_encode_msgpack_create(cmt_out_context, &serialized_context_buffer, @@ -822,7 +849,6 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, if (cmt_out_context != metrics_context) { cmt_destroy(cmt_out_context); } - } else { ret = cmt_encode_msgpack_create(metrics_context, @@ -836,23 +862,17 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, flb_coro_destroy(coro); flb_free(out_flush); flb_free(p_buf); - return NULL; } - if ((serialization_buffer_offset + - serialized_context_size) > p_size) { - resized_serialization_buffer = \ - flb_realloc(p_buf, p_size + serialized_context_size); - + if ((serialization_buffer_offset + serialized_context_size) > p_size) { + resized_serialization_buffer = flb_realloc(p_buf, p_size + serialized_context_size); if (resized_serialization_buffer == NULL) { flb_errno(); - cmt_encode_msgpack_destroy(serialized_context_buffer); flb_coro_destroy(coro); flb_free(out_flush); flb_free(p_buf); - return NULL; } @@ -868,29 +888,49 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, cmt_encode_msgpack_destroy(serialized_context_buffer); } + else { + cmt_destroy(metrics_context); + if (cmt_out_context != NULL && cmt_out_context != metrics_context) { + cmt_destroy(cmt_out_context); + } + flb_coro_destroy(coro); + flb_free(out_flush); + flb_free(p_buf); + return NULL; + } } if (serialization_buffer_offset == 0) { - flb_coro_destroy(coro); - flb_free(out_flush); + flb_debug("[output] skipping flush for metrics event chunk with zero metrics after processing."); flb_free(p_buf); - - return NULL; + p_buf = NULL; /* Mark as freed to avoid double-free */ + + /* Create an empty processed event chunk to signal success */ + out_flush->processed_event_chunk = flb_event_chunk_create( + evc->type, + 0, + evc->tag, + flb_sds_len(evc->tag), + NULL, + 0); + } + else { + p_size = serialization_buffer_offset; + out_flush->processed_event_chunk = flb_event_chunk_create( + evc->type, + 0, + evc->tag, + flb_sds_len(evc->tag), + p_buf, + p_size); } - - out_flush->processed_event_chunk = flb_event_chunk_create( - evc->type, - 0, - evc->tag, - flb_sds_len(evc->tag), - p_buf, - p_size); if (out_flush->processed_event_chunk == NULL) { flb_coro_destroy(coro); flb_free(out_flush); - flb_free(p_buf); - + if (p_buf != NULL) { + flb_free(p_buf); + } return NULL; } } diff --git a/src/flb_input_metric.c b/src/flb_input_metric.c index 9daf21ba037..748273fe5dd 100644 --- a/src/flb_input_metric.c +++ b/src/flb_input_metric.c @@ -22,6 +22,7 @@ #include #include #include +#include static int input_metrics_append(struct flb_input_instance *ins, size_t processor_starting_stage, @@ -33,6 +34,7 @@ static int input_metrics_append(struct flb_input_instance *ins, size_t mt_size; int processor_is_active; struct cmt *out_context = NULL; + struct cmt *encode_context; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -60,29 +62,31 @@ static int input_metrics_append(struct flb_input_instance *ins, } } + if (out_context) { + encode_context = out_context; + } + else { + encode_context = cmt; + } - if (out_context != NULL) { - /* Convert metrics to msgpack */ - ret = cmt_encode_msgpack_create(out_context, &mt_buf, &mt_size); - - if (out_context != cmt) { + /* Drop the context if it contains no metrics */ + if (encode_context == NULL || flb_metrics_is_empty(encode_context)) { + if (out_context && out_context != cmt) { cmt_destroy(out_context); } + return 0; + } - if (ret != 0) { - flb_plg_error(ins, "could not encode metrics"); + /* Convert metrics to msgpack */ + ret = cmt_encode_msgpack_create(encode_context, &mt_buf, &mt_size); - return -1; - } + if (out_context && out_context != cmt) { + cmt_destroy(out_context); } - else { - /* Convert metrics to msgpack */ - ret = cmt_encode_msgpack_create(cmt, &mt_buf, &mt_size); - if (ret != 0) { - flb_plg_error(ins, "could not encode metrics"); - return -1; - } + if (ret != 0) { + flb_plg_error(ins, "could not encode metrics"); + return -1; } /* Append packed metrics */ diff --git a/src/flb_metrics.c b/src/flb_metrics.c index d97996fe34d..2797fd86235 100644 --- a/src/flb_metrics.c +++ b/src/flb_metrics.c @@ -376,3 +376,12 @@ int flb_metrics_fluentbit_add(struct flb_config *ctx, struct cmt *cmt) return 0; } + +bool flb_metrics_is_empty(struct cmt *cmt) +{ + return cfl_list_is_empty(&cmt->counters) && + cfl_list_is_empty(&cmt->gauges) && + cfl_list_is_empty(&cmt->histograms) && + cfl_list_is_empty(&cmt->summaries) && + cfl_list_is_empty(&cmt->untypeds); +}