24
24
#include <fluent-bit/flb_pack.h>
25
25
#include <fluent-bit/flb_socket.h>
26
26
#include <fluent-bit/flb_gzip.h>
27
+ #include <fluent-bit/flb_zstd.h>
27
28
#include <fluent-bit/flb_pack.h>
28
29
#include <sys/types.h>
29
30
#include <sys/stat.h>
@@ -678,6 +679,117 @@ void flb_test_forward_gzip()
678
679
test_ctx_destroy (ctx );
679
680
}
680
681
682
+ /*
683
+ * Creates a forward-protocol-compliant, Zstd-compressed MessagePack payload.
684
+ * The final structure is: [tag, compressed_events, {options}]
685
+ */
686
+ static int create_simple_json_zstd (msgpack_sbuffer * sbuf )
687
+ {
688
+ int ret ;
689
+ char * event_buf ;
690
+ size_t event_size ;
691
+ char * compressed_buf ;
692
+ size_t compressed_size ;
693
+ int root_type ;
694
+ msgpack_packer pck ;
695
+
696
+ char * tag = "test" ;
697
+ char event_json [] = "[1234567890,{\"test\":\"msg\"}]" ;
698
+
699
+ ret = flb_pack_json (event_json , strlen (event_json ),
700
+ & event_buf , & event_size , & root_type , NULL );
701
+ if (!TEST_CHECK (ret == 0 )) {
702
+ return -1 ;
703
+ }
704
+
705
+ ret = flb_zstd_compress (event_buf , event_size ,
706
+ (void * * )& compressed_buf , & compressed_size );
707
+ if (!TEST_CHECK (ret == 0 )) {
708
+ flb_free (event_buf );
709
+ return -1 ;
710
+ }
711
+ flb_free (event_buf );
712
+
713
+ /* Create temporary msgpack buffer */
714
+ msgpack_packer_init (& pck , sbuf , msgpack_sbuffer_write );
715
+
716
+ msgpack_pack_array (& pck , 3 );
717
+ msgpack_pack_str_with_body (& pck , tag , strlen (tag ));
718
+ msgpack_pack_bin_with_body (& pck , compressed_buf , compressed_size );
719
+ msgpack_pack_map (& pck , 2 );
720
+ msgpack_pack_str_with_body (& pck , "compressed" , 10 );
721
+ msgpack_pack_str_with_body (& pck , "zstd" , 4 );
722
+ msgpack_pack_str_with_body (& pck , "size" , 4 );
723
+ msgpack_pack_uint64 (& pck , event_size );
724
+
725
+ flb_free (compressed_buf );
726
+
727
+ return 0 ;
728
+ }
729
+
730
+ void flb_test_forward_zstd ()
731
+ {
732
+ struct flb_lib_out_cb cb_data ;
733
+ struct test_ctx * ctx ;
734
+ flb_sockfd_t fd ;
735
+ int ret ;
736
+ int num ;
737
+ ssize_t w_size ;
738
+
739
+ char * buf ;
740
+ size_t size ;
741
+
742
+ msgpack_sbuffer sbuf ;
743
+
744
+ clear_output_num ();
745
+
746
+ cb_data .cb = cb_check_result_json ;
747
+ cb_data .data = "\"test\":\"msg\"" ;
748
+
749
+ ctx = test_ctx_create (& cb_data );
750
+ if (!TEST_CHECK (ctx != NULL )) {
751
+ TEST_MSG ("test_ctx_create failed" );
752
+ exit (EXIT_FAILURE );
753
+ }
754
+
755
+ ret = flb_output_set (ctx -> flb , ctx -> o_ffd ,
756
+ "match" , "test" ,
757
+ "format" , "json" ,
758
+ NULL );
759
+ TEST_CHECK (ret == 0 );
760
+
761
+ ret = flb_start (ctx -> flb );
762
+ TEST_CHECK (ret == 0 );
763
+
764
+ fd = connect_tcp (NULL , -1 );
765
+ if (!TEST_CHECK (fd >= 0 )) {
766
+ exit (EXIT_FAILURE );
767
+ }
768
+
769
+ msgpack_sbuffer_init (& sbuf );
770
+ create_simple_json_zstd (& sbuf );
771
+
772
+ w_size = send (fd , sbuf .data , sbuf .size , 0 );
773
+ if (!TEST_CHECK (w_size == sbuf .size )) {
774
+ TEST_MSG ("failed to send, errno=%d" , errno );
775
+ flb_socket_close (fd );
776
+ msgpack_sbuffer_destroy (& sbuf );
777
+ exit (EXIT_FAILURE );
778
+ }
779
+
780
+ msgpack_sbuffer_destroy (& sbuf );
781
+
782
+ flb_time_msleep (1500 );
783
+
784
+ num = get_output_num ();
785
+ if (!TEST_CHECK (num > 0 )) {
786
+ TEST_MSG ("no outputs" );
787
+ }
788
+
789
+ flb_socket_close (fd );
790
+ test_ctx_destroy (ctx );
791
+ }
792
+
681
793
682
794
TEST_LIST = {
683
795
{"forward" , flb_test_forward },
@@ -688,5 +800,6 @@ TEST_LIST = {
688
800
{"unix_perm" , flb_test_unix_perm },
689
801
#endif
690
802
{"forward_gzip" , flb_test_forward_gzip },
803
+ {"forward_zstd" , flb_test_forward_zstd },
691
804
{NULL , NULL }
692
805
};
0 commit comments