23
23
#include <fluent-bit/flb_time.h>
24
24
#include <fluent-bit/flb_pack.h>
25
25
#include <fluent-bit/flb_socket.h>
26
+ #include <fluent-bit/flb_gzip.h>
27
+ #include <fluent-bit/flb_pack.h>
26
28
#include <sys/types.h>
27
29
#include <sys/stat.h>
28
30
#ifdef FLB_HAVE_UNIX_SOCKET
@@ -565,6 +567,117 @@ void flb_test_unix_perm()
565
567
}
566
568
#endif /* FLB_HAVE_UNIX_SOCKET */
567
569
570
+ /*
571
+ * Creates a forward-protocol-compliant, Gzip-compressed MessagePack payload.
572
+ * The final structure is: [tag, compressed_events, {options}]
573
+ */
574
+ static int create_simple_json_gzip (msgpack_sbuffer * sbuf )
575
+ {
576
+ int ret ;
577
+ char * event_buf ;
578
+ size_t event_size ;
579
+ char * compressed_buf ;
580
+ size_t compressed_size ;
581
+ int root_type ;
582
+ msgpack_packer pck ;
583
+
584
+ char * tag = "test" ;
585
+ char event_json [] = "[1234567890,{\"test\":\"msg\"}]" ;
586
+
587
+ ret = flb_pack_json (event_json , strlen (event_json ),
588
+ & event_buf , & event_size , & root_type , NULL );
589
+ if (!TEST_CHECK (ret == 0 )) {
590
+ return -1 ;
591
+ }
592
+
593
+ ret = flb_gzip_compress (event_buf , event_size ,
594
+ (void * * )& compressed_buf , & compressed_size );
595
+ if (!TEST_CHECK (ret == 0 )) {
596
+ flb_free (event_buf );
597
+ return -1 ;
598
+ }
599
+ flb_free (event_buf );
600
+
601
+ /* Create temporary msgpack buffer */
602
+ msgpack_packer_init (& pck , sbuf , msgpack_sbuffer_write );
603
+
604
+ msgpack_pack_array (& pck , 3 );
605
+ msgpack_pack_str_with_body (& pck , tag , strlen (tag ));
606
+ msgpack_pack_bin_with_body (& pck , compressed_buf , compressed_size );
607
+ msgpack_pack_map (& pck , 2 );
608
+ msgpack_pack_str_with_body (& pck , "compressed" , 10 );
609
+ msgpack_pack_str_with_body (& pck , "gzip" , 4 );
610
+ msgpack_pack_str_with_body (& pck , "size" , 4 );
611
+ msgpack_pack_uint64 (& pck , event_size );
612
+
613
+ flb_free (compressed_buf );
614
+
615
+ return 0 ;
616
+ }
617
+
618
+ void flb_test_forward_gzip ()
619
+ {
620
+ struct flb_lib_out_cb cb_data ;
621
+ struct test_ctx * ctx ;
622
+ flb_sockfd_t fd ;
623
+ int ret ;
624
+ int num ;
625
+ ssize_t w_size ;
626
+
627
+ char * buf ;
628
+ size_t size ;
629
+
630
+ msgpack_sbuffer sbuf ;
631
+
632
+ clear_output_num ();
633
+
634
+ cb_data .cb = cb_check_result_json ;
635
+ cb_data .data = "\"test\":\"msg\"" ;
636
+
637
+ ctx = test_ctx_create (& cb_data );
638
+ if (!TEST_CHECK (ctx != NULL )) {
639
+ TEST_MSG ("test_ctx_create failed" );
640
+ exit (EXIT_FAILURE );
641
+ }
642
+
643
+ ret = flb_output_set (ctx -> flb , ctx -> o_ffd ,
644
+ "match" , "test" ,
645
+ "format" , "json" ,
646
+ NULL );
647
+ TEST_CHECK (ret == 0 );
648
+
649
+ ret = flb_start (ctx -> flb );
650
+ TEST_CHECK (ret == 0 );
651
+
652
+ fd = connect_tcp (NULL , -1 );
653
+ if (!TEST_CHECK (fd >= 0 )) {
654
+ exit (EXIT_FAILURE );
655
+ }
656
+
657
+ msgpack_sbuffer_init (& sbuf );
658
+ create_simple_json_gzip (& sbuf );
659
+
660
+ w_size = send (fd , sbuf .data , sbuf .size , 0 );
661
+ if (!TEST_CHECK (w_size == sbuf .size )) {
662
+ TEST_MSG ("failed to send, errno=%d" , errno );
663
+ flb_socket_close (fd );
664
+ msgpack_sbuffer_destroy (& sbuf );
665
+ exit (EXIT_FAILURE );
666
+ }
667
+
668
+ msgpack_sbuffer_destroy (& sbuf );
669
+
670
+ flb_time_msleep (1500 );
671
+
672
+ num = get_output_num ();
673
+ if (!TEST_CHECK (num > 0 )) {
674
+ TEST_MSG ("no outputs" );
675
+ }
676
+
677
+ flb_socket_close (fd );
678
+ test_ctx_destroy (ctx );
679
+ }
680
+
568
681
569
682
TEST_LIST = {
570
683
{"forward" , flb_test_forward },
@@ -574,6 +687,6 @@ TEST_LIST = {
574
687
{"unix_path" , flb_test_unix_path },
575
688
{"unix_perm" , flb_test_unix_perm },
576
689
#endif
690
+ {"forward_gzip" , flb_test_forward_gzip },
577
691
{NULL , NULL }
578
692
};
579
-
0 commit comments