4949#include <fluent-bit/flb_event.h>
5050#include <fluent-bit/flb_processor.h>
5151
52+ #include <cfl/cfl.h>
5253#include <cmetrics/cmetrics.h>
5354#include <cmetrics/cmt_gauge.h>
5455#include <cmetrics/cmt_counter.h>
@@ -669,6 +670,13 @@ static FLB_INLINE void output_pre_cb_flush(void)
669670 flb_debug ("[output] skipping flush for event chunk with zero records." );
670671 FLB_OUTPUT_RETURN (FLB_OK );
671672 }
673+ /* Skip flush if processed event chunk has no data (empty after processing) */
674+ else if (persisted_params .event_chunk &&
675+ persisted_params .event_chunk -> type == FLB_EVENT_TYPE_METRICS &&
676+ persisted_params .event_chunk -> size == 0 ) {
677+ flb_debug ("[output] skipping flush for event chunk with no data after processing." );
678+ FLB_OUTPUT_RETURN (FLB_OK );
679+ }
672680
673681 /* Continue, we will resume later */
674682 out_p = persisted_params .out_plugin ;
@@ -708,7 +716,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
708716{
709717 int ret ;
710718 size_t records ;
711- void * p_buf ;
719+ void * p_buf = NULL ;
712720 size_t p_size ;
713721 size_t stack_size ;
714722 struct flb_coro * coro ;
@@ -725,8 +733,10 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
725733 struct ctrace * trace_context ;
726734 struct cprof * profile_context ;
727735 size_t chunk_offset ;
736+ struct cmt * encode_context = NULL ;
728737 struct cmt * cmt_out_context = NULL ;
729738
739+
730740 /* Custom output coroutine info */
731741 out_flush = (struct flb_output_flush * ) flb_calloc (1 , sizeof (struct flb_output_flush ));
732742 if (!out_flush ) {
@@ -756,6 +766,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
756766
757767 /* Logs processor */
758768 evc = task -> event_chunk ;
769+
759770 if (flb_processor_is_active (o_ins -> processor )) {
760771 if (evc -> type == FLB_EVENT_TYPE_LOGS ) {
761772 /* run the processor */
@@ -786,10 +797,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
786797
787798 if (p_buf == NULL ) {
788799 flb_errno ();
789-
790800 flb_coro_destroy (coro );
791801 flb_free (out_flush );
792-
793802 return NULL ;
794803 }
795804
@@ -803,6 +812,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
803812 (char * ) evc -> data ,
804813 evc -> size ,
805814 & chunk_offset )) == CMT_DECODE_MSGPACK_SUCCESS ) {
815+
806816 ret = flb_processor_run (o_ins -> processor ,
807817 0 ,
808818 FLB_PROCESSOR_METRICS ,
@@ -814,6 +824,22 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
814824 NULL );
815825
816826 if (ret == 0 ) {
827+ if (cmt_out_context ) {
828+ encode_context = cmt_out_context ;
829+ }
830+ else {
831+ encode_context = metrics_context ;
832+ }
833+
834+ /* if the cmetrics context lack of time series just skip it */
835+ if (flb_metrics_is_empty (encode_context )) {
836+ if (encode_context != metrics_context ) {
837+ cmt_destroy (encode_context );
838+ }
839+ cmt_destroy (metrics_context );
840+ continue ;
841+ }
842+
817843 if (cmt_out_context != NULL ) {
818844 ret = cmt_encode_msgpack_create (cmt_out_context ,
819845 & serialized_context_buffer ,
@@ -822,7 +848,6 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
822848 if (cmt_out_context != metrics_context ) {
823849 cmt_destroy (cmt_out_context );
824850 }
825-
826851 }
827852 else {
828853 ret = cmt_encode_msgpack_create (metrics_context ,
@@ -836,23 +861,17 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
836861 flb_coro_destroy (coro );
837862 flb_free (out_flush );
838863 flb_free (p_buf );
839-
840864 return NULL ;
841865 }
842866
843- if ((serialization_buffer_offset +
844- serialized_context_size ) > p_size ) {
845- resized_serialization_buffer = \
846- flb_realloc (p_buf , p_size + serialized_context_size );
847-
867+ if ((serialization_buffer_offset + serialized_context_size ) > p_size ) {
868+ resized_serialization_buffer = flb_realloc (p_buf , p_size + serialized_context_size );
848869 if (resized_serialization_buffer == NULL ) {
849870 flb_errno ();
850-
851871 cmt_encode_msgpack_destroy (serialized_context_buffer );
852872 flb_coro_destroy (coro );
853873 flb_free (out_flush );
854874 flb_free (p_buf );
855-
856875 return NULL ;
857876 }
858877
@@ -871,26 +890,35 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
871890 }
872891
873892 if (serialization_buffer_offset == 0 ) {
874- flb_coro_destroy (coro );
875- flb_free (out_flush );
893+ flb_debug ("[output] skipping flush for metrics event chunk with zero metrics after processing." );
876894 flb_free (p_buf );
877-
878- return NULL ;
895+ p_buf = NULL ; /* Mark as freed to avoid double-free */
896+
897+ /* Create an empty processed event chunk to signal success */
898+ out_flush -> processed_event_chunk = flb_event_chunk_create (
899+ evc -> type ,
900+ 0 ,
901+ evc -> tag ,
902+ flb_sds_len (evc -> tag ),
903+ NULL ,
904+ 0 );
905+ }
906+ else {
907+ out_flush -> processed_event_chunk = flb_event_chunk_create (
908+ evc -> type ,
909+ 0 ,
910+ evc -> tag ,
911+ flb_sds_len (evc -> tag ),
912+ p_buf ,
913+ p_size );
879914 }
880-
881- out_flush -> processed_event_chunk = flb_event_chunk_create (
882- evc -> type ,
883- 0 ,
884- evc -> tag ,
885- flb_sds_len (evc -> tag ),
886- p_buf ,
887- p_size );
888915
889916 if (out_flush -> processed_event_chunk == NULL ) {
890917 flb_coro_destroy (coro );
891918 flb_free (out_flush );
892- flb_free (p_buf );
893-
919+ if (p_buf != NULL ) {
920+ flb_free (p_buf );
921+ }
894922 return NULL ;
895923 }
896924 }
0 commit comments