49
49
#include <fluent-bit/flb_event.h>
50
50
#include <fluent-bit/flb_processor.h>
51
51
52
+ #include <cfl/cfl.h>
52
53
#include <cmetrics/cmetrics.h>
53
54
#include <cmetrics/cmt_gauge.h>
54
55
#include <cmetrics/cmt_counter.h>
@@ -669,6 +670,13 @@ static FLB_INLINE void output_pre_cb_flush(void)
669
670
flb_debug ("[output] skipping flush for event chunk with zero records." );
670
671
FLB_OUTPUT_RETURN (FLB_OK );
671
672
}
673
+ /* Skip flush if processed event chunk has no data (empty after processing) */
674
+ else if (persisted_params .event_chunk &&
675
+ persisted_params .event_chunk -> type == FLB_EVENT_TYPE_METRICS &&
676
+ persisted_params .event_chunk -> size == 0 ) {
677
+ flb_debug ("[output] skipping flush for event chunk with no data after processing." );
678
+ FLB_OUTPUT_RETURN (FLB_OK );
679
+ }
672
680
673
681
/* Continue, we will resume later */
674
682
out_p = persisted_params .out_plugin ;
@@ -708,7 +716,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
708
716
{
709
717
int ret ;
710
718
size_t records ;
711
- void * p_buf ;
719
+ void * p_buf = NULL ;
712
720
size_t p_size ;
713
721
size_t stack_size ;
714
722
struct flb_coro * coro ;
@@ -725,8 +733,10 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
725
733
struct ctrace * trace_context ;
726
734
struct cprof * profile_context ;
727
735
size_t chunk_offset ;
736
+ struct cmt * encode_context = NULL ;
728
737
struct cmt * cmt_out_context = NULL ;
729
738
739
+
730
740
/* Custom output coroutine info */
731
741
out_flush = (struct flb_output_flush * ) flb_calloc (1 , sizeof (struct flb_output_flush ));
732
742
if (!out_flush ) {
@@ -756,6 +766,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
756
766
757
767
/* Logs processor */
758
768
evc = task -> event_chunk ;
769
+
759
770
if (flb_processor_is_active (o_ins -> processor )) {
760
771
if (evc -> type == FLB_EVENT_TYPE_LOGS ) {
761
772
/* run the processor */
@@ -786,10 +797,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
786
797
787
798
if (p_buf == NULL ) {
788
799
flb_errno ();
789
-
790
800
flb_coro_destroy (coro );
791
801
flb_free (out_flush );
792
-
793
802
return NULL ;
794
803
}
795
804
@@ -803,6 +812,7 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
803
812
(char * ) evc -> data ,
804
813
evc -> size ,
805
814
& chunk_offset )) == CMT_DECODE_MSGPACK_SUCCESS ) {
815
+
806
816
ret = flb_processor_run (o_ins -> processor ,
807
817
0 ,
808
818
FLB_PROCESSOR_METRICS ,
@@ -814,6 +824,22 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
814
824
NULL );
815
825
816
826
if (ret == 0 ) {
827
+ if (cmt_out_context ) {
828
+ encode_context = cmt_out_context ;
829
+ }
830
+ else {
831
+ encode_context = metrics_context ;
832
+ }
833
+
834
+ /* if the cmetrics context lack of time series just skip it */
835
+ if (flb_metrics_is_empty (encode_context )) {
836
+ if (encode_context != metrics_context ) {
837
+ cmt_destroy (encode_context );
838
+ }
839
+ cmt_destroy (metrics_context );
840
+ continue ;
841
+ }
842
+
817
843
if (cmt_out_context != NULL ) {
818
844
ret = cmt_encode_msgpack_create (cmt_out_context ,
819
845
& serialized_context_buffer ,
@@ -822,7 +848,6 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
822
848
if (cmt_out_context != metrics_context ) {
823
849
cmt_destroy (cmt_out_context );
824
850
}
825
-
826
851
}
827
852
else {
828
853
ret = cmt_encode_msgpack_create (metrics_context ,
@@ -836,23 +861,17 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
836
861
flb_coro_destroy (coro );
837
862
flb_free (out_flush );
838
863
flb_free (p_buf );
839
-
840
864
return NULL ;
841
865
}
842
866
843
- if ((serialization_buffer_offset +
844
- serialized_context_size ) > p_size ) {
845
- resized_serialization_buffer = \
846
- flb_realloc (p_buf , p_size + serialized_context_size );
847
-
867
+ if ((serialization_buffer_offset + serialized_context_size ) > p_size ) {
868
+ resized_serialization_buffer = flb_realloc (p_buf , p_size + serialized_context_size );
848
869
if (resized_serialization_buffer == NULL ) {
849
870
flb_errno ();
850
-
851
871
cmt_encode_msgpack_destroy (serialized_context_buffer );
852
872
flb_coro_destroy (coro );
853
873
flb_free (out_flush );
854
874
flb_free (p_buf );
855
-
856
875
return NULL ;
857
876
}
858
877
@@ -871,26 +890,35 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
871
890
}
872
891
873
892
if (serialization_buffer_offset == 0 ) {
874
- flb_coro_destroy (coro );
875
- flb_free (out_flush );
893
+ flb_debug ("[output] skipping flush for metrics event chunk with zero metrics after processing." );
876
894
flb_free (p_buf );
877
-
878
- return NULL ;
895
+ p_buf = NULL ; /* Mark as freed to avoid double-free */
896
+
897
+ /* Create an empty processed event chunk to signal success */
898
+ out_flush -> processed_event_chunk = flb_event_chunk_create (
899
+ evc -> type ,
900
+ 0 ,
901
+ evc -> tag ,
902
+ flb_sds_len (evc -> tag ),
903
+ NULL ,
904
+ 0 );
905
+ }
906
+ else {
907
+ out_flush -> processed_event_chunk = flb_event_chunk_create (
908
+ evc -> type ,
909
+ 0 ,
910
+ evc -> tag ,
911
+ flb_sds_len (evc -> tag ),
912
+ p_buf ,
913
+ p_size );
879
914
}
880
-
881
- out_flush -> processed_event_chunk = flb_event_chunk_create (
882
- evc -> type ,
883
- 0 ,
884
- evc -> tag ,
885
- flb_sds_len (evc -> tag ),
886
- p_buf ,
887
- p_size );
888
915
889
916
if (out_flush -> processed_event_chunk == NULL ) {
890
917
flb_coro_destroy (coro );
891
918
flb_free (out_flush );
892
- flb_free (p_buf );
893
-
919
+ if (p_buf != NULL ) {
920
+ flb_free (p_buf );
921
+ }
894
922
return NULL ;
895
923
}
896
924
}
0 commit comments