2222#include <fluent-bit/flb_error.h>
2323#include <fluent-bit/flb_pack.h>
2424
25+ #include <fluent-bit/flb_gzip.h>
26+ #include <fluent-bit/flb_zstd.h>
27+ #include <fluent-bit/flb_snappy.h>
28+
2529#include <monkey/monkey.h>
2630#include <monkey/mk_core.h>
2731
@@ -510,6 +514,226 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag,
510514 return ret ;
511515}
512516
517+
518+ /*
519+ * We use two backends for HTTP parsing and it depends on the version of the
520+ * protocol:
521+ *
522+ * http/1.x: we use Monkey HTTP parser: struct mk_http_session.parser
523+ */
524+ static int http_header_lookup (int version , void * ptr , char * key ,
525+ char * * val , size_t * val_len )
526+ {
527+ int key_len ;
528+
529+ /* HTTP/1.1 */
530+ struct mk_list * head ;
531+ struct mk_http_session * session ;
532+ struct mk_http_request * request_11 ;
533+ struct mk_http_header * header ;
534+
535+ /* HTTP/2.0 */
536+ char * value ;
537+ struct flb_http_request * request_20 ;
538+
539+ if (!key ) {
540+ return -1 ;
541+ }
542+
543+ key_len = strlen (key );
544+ if (key_len <= 0 ) {
545+ return -1 ;
546+ }
547+
548+ if (version <= HTTP_PROTOCOL_VERSION_11 ) {
549+ if (!ptr ) {
550+ return -1 ;
551+ }
552+
553+ request_11 = (struct mk_http_request * ) ptr ;
554+ session = request_11 -> session ;
555+ mk_list_foreach (head , & session -> parser .header_list ) {
556+ header = mk_list_entry (head , struct mk_http_header , _head );
557+ if (header -> key .len == key_len &&
558+ strncasecmp (header -> key .data , key , key_len ) == 0 ) {
559+ * val = header -> val .data ;
560+ * val_len = header -> val .len ;
561+ return 0 ;
562+ }
563+ }
564+ return -1 ;
565+ }
566+ else if (version == HTTP_PROTOCOL_VERSION_20 ) {
567+ request_20 = ptr ;
568+ if (!request_20 ) {
569+ return -1 ;
570+ }
571+
572+ value = flb_http_request_get_header (request_20 , key );
573+ if (!value ) {
574+ return -1 ;
575+ }
576+
577+ * val = value ;
578+ * val_len = strlen (value );
579+ return 0 ;
580+ }
581+
582+ return -1 ;
583+ }
584+
585+
586+ static \
587+ int uncompress_zlib (struct flb_http * ctx ,
588+ char * * output_buffer ,
589+ size_t * output_size ,
590+ char * input_buffer ,
591+ size_t input_size )
592+ {
593+ flb_plg_warn (ctx -> ins , "zlib decompression is not supported" );
594+ return 0 ;
595+ }
596+
597+ static \
598+ int uncompress_zstd (struct flb_http * ctx ,
599+ char * * output_buffer ,
600+ size_t * output_size ,
601+ char * input_buffer ,
602+ size_t input_size )
603+ {
604+ int ret ;
605+
606+ ret = flb_zstd_uncompress (input_buffer ,
607+ input_size ,
608+ (void * ) output_buffer ,
609+ output_size );
610+
611+ if (ret != 0 ) {
612+ flb_plg_error (ctx -> ins , "zstd decompression failed" );
613+ return -1 ;
614+ }
615+
616+ return 1 ;
617+ }
618+
619+ static \
620+ int uncompress_deflate (struct flb_http * ctx ,
621+ char * * output_buffer ,
622+ size_t * output_size ,
623+ char * input_buffer ,
624+ size_t input_size )
625+ {
626+ flb_plg_warn (ctx -> ins , "deflate decompression is not supported" );
627+ return 0 ;
628+ }
629+
630+ static \
631+ int uncompress_snappy (struct flb_http * ctx ,
632+ char * * output_buffer ,
633+ size_t * output_size ,
634+ char * input_buffer ,
635+ size_t input_size )
636+ {
637+ int ret ;
638+
639+ ret = flb_snappy_uncompress_framed_data (input_buffer ,
640+ input_size ,
641+ output_buffer ,
642+ output_size );
643+
644+ if (ret != 0 ) {
645+ flb_plg_error (ctx -> ins , "snappy decompression failed" );
646+ return -1 ;
647+ }
648+
649+ return 1 ;
650+ }
651+
652+ static \
653+ int uncompress_gzip (struct flb_http * ctx ,
654+ char * * output_buffer ,
655+ size_t * output_size ,
656+ char * input_buffer ,
657+ size_t input_size )
658+ {
659+ int ret ;
660+
661+ ret = flb_gzip_uncompress (input_buffer ,
662+ input_size ,
663+ (void * ) output_buffer ,
664+ output_size );
665+
666+ if (ret == -1 ) {
667+ flb_error ("[opentelemetry] gzip decompression failed" );
668+
669+ return -1 ;
670+ }
671+
672+ return 1 ;
673+ }
674+
675+ /* Used for HTTP/1.1 */
676+ static int http_prot_uncompress (struct flb_http * ctx ,
677+ struct mk_http_request * request ,
678+ char * * output_buffer ,
679+ size_t * output_size )
680+ {
681+ int ret = 0 ;
682+ char * body ;
683+ size_t body_size ;
684+ char * encoding ;
685+ size_t encoding_len ;
686+
687+ * output_buffer = NULL ;
688+ * output_size = 0 ;
689+
690+ /* get the Content-Encoding */
691+ ret = http_header_lookup (HTTP_PROTOCOL_VERSION_11 ,
692+ request ,
693+ "Content-Encoding" ,
694+ & encoding , & encoding_len );
695+
696+ /* FYI: no encoding was found, assume no payload compression */
697+ if (ret < 0 ) {
698+ return 0 ;
699+ }
700+
701+ /* set the payload pointers */
702+ body = request -> data .data ;
703+ body_size = request -> data .len ;
704+
705+ if (strncasecmp (encoding , "gzip" , 4 ) == 0 && encoding_len == 4 ) {
706+ return uncompress_gzip (ctx ,
707+ output_buffer , output_size ,
708+ body , body_size );
709+ }
710+ else if (strncasecmp (encoding , "zlib" , 4 ) == 0 && encoding_len == 4 ) {
711+ return uncompress_zlib (ctx ,
712+ output_buffer , output_size ,
713+ body , body_size );
714+ }
715+ else if (strncasecmp (encoding , "zstd" , 4 ) == 0 && encoding_len == 4 ) {
716+ return uncompress_zstd (ctx ,
717+ output_buffer , output_size ,
718+ body , body_size );
719+ }
720+ else if (strncasecmp (encoding , "snappy" , 6 ) == 0 && encoding_len == 6 ) {
721+ return uncompress_snappy (ctx ,
722+ output_buffer , output_size ,
723+ body , body_size );
724+ }
725+ else if (strncasecmp (encoding , "deflate" , 7 ) == 0 && encoding_len == 7 ) {
726+ return uncompress_deflate (ctx ,
727+ output_buffer , output_size ,
728+ body , body_size );
729+ }
730+ else {
731+ return -2 ;
732+ }
733+
734+ return 0 ;
735+ }
736+
513737static int process_payload (struct flb_http * ctx , struct http_conn * conn ,
514738 flb_sds_t tag ,
515739 struct mk_http_session * session ,
@@ -522,6 +746,8 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
522746 char * out_chunked = NULL ;
523747 size_t out_chunked_size ;
524748 struct mk_http_header * header ;
749+ char * uncompressed_data = NULL ;
750+ size_t uncompressed_data_size = 0 ;
525751
526752 header = & session -> parser .headers [MK_HEADER_CONTENT_TYPE ];
527753 if (header -> key .data == NULL ) {
@@ -571,6 +797,19 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
571797 request -> data .len = out_chunked_size ;
572798 }
573799
800+ /*
801+ * HTTP/1.x can have the payload compressed, we try to detect based on the
802+ * Content-Encoding header.
803+ */
804+ ret = http_prot_uncompress (ctx ,
805+ request ,
806+ & uncompressed_data ,
807+ & uncompressed_data_size );
808+
809+ if (ret > 0 ) {
810+ request -> data .data = uncompressed_data ;
811+ request -> data .len = uncompressed_data_size ;
812+ }
574813
575814 if (type == HTTP_CONTENT_JSON ) {
576815 ret = parse_payload_json (ctx , tag , request -> data .data , request -> data .len );
@@ -579,6 +818,10 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn,
579818 ret = parse_payload_urlencoded (ctx , tag , request -> data .data , request -> data .len );
580819 }
581820
821+ if (uncompressed_data != NULL ) {
822+ flb_free (uncompressed_data );
823+ }
824+
582825 if (out_chunked ) {
583826 mk_mem_free (out_chunked );
584827 request -> data .data = original_data ;
0 commit comments