@@ -90,50 +90,33 @@ static int get_chunk_event_type(struct flb_input_instance *ins, msgpack_object o
90
90
return type ;
91
91
}
92
92
93
- static int is_gzip_compressed (msgpack_object options )
93
+ static int get_compression_type (msgpack_object options )
94
94
{
95
95
int i ;
96
- msgpack_object k ;
97
- msgpack_object v ;
96
+ msgpack_object k , v ;
98
97
99
98
if (options .type != MSGPACK_OBJECT_MAP ) {
100
99
return -1 ;
101
100
}
102
101
103
-
104
102
for (i = 0 ; i < options .via .map .size ; i ++ ) {
105
103
k = options .via .map .ptr [i ].key ;
106
104
v = options .via .map .ptr [i ].val ;
107
105
108
- if (k .type != MSGPACK_OBJECT_STR ) {
109
- return -1 ;
110
- }
111
-
112
- if (k .via .str .size != 10 ) {
113
- continue ;
114
- }
115
-
116
- if (strncmp (k .via .str .ptr , "compressed" , 10 ) == 0 ) {
117
- if (v .type != MSGPACK_OBJECT_STR ) {
118
- return -1 ;
119
- }
120
-
121
- if (v .via .str .size != 4 ) {
122
- return -1 ;
123
- }
124
-
125
- if (strncmp (v .via .str .ptr , "gzip" , 4 ) == 0 ) {
126
- return FLB_TRUE ;
127
- }
128
- else if (strncmp (v .via .str .ptr , "text" , 4 ) == 0 ) {
129
- return FLB_FALSE ;
106
+ if (k .type == MSGPACK_OBJECT_STR && k .via .str .size == 10 &&
107
+ strncmp (k .via .str .ptr , "compressed" , 10 ) == 0 ) {
108
+ if (v .type == MSGPACK_OBJECT_STR ) {
109
+ if (v .via .str .size == 4 && strncmp (v .via .str .ptr , "gzip" , 4 ) == 0 ) {
110
+ return FLB_COMPRESSION_ALGORITHM_GZIP ;
111
+ }
112
+ if (v .via .str .size == 4 && strncmp (v .via .str .ptr , "zstd" , 4 ) == 0 ) {
113
+ return FLB_COMPRESSION_ALGORITHM_ZSTD ;
114
+ }
130
115
}
131
-
132
- return -1 ;
133
116
}
134
117
}
135
118
136
- return FLB_FALSE ;
119
+ return FLB_COMPRESSION_ALGORITHM_NONE ;
137
120
}
138
121
139
122
static inline void print_msgpack_error_code (struct flb_input_instance * in ,
@@ -1269,6 +1252,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
1269
1252
msgpack_unpacked result ;
1270
1253
msgpack_unpacker * unp ;
1271
1254
size_t all_used = 0 ;
1255
+ const char * payload_data = NULL ;
1256
+ size_t payload_len = 0 ;
1272
1257
struct flb_in_fw_config * ctx = conn -> ctx ;
1273
1258
1274
1259
/*
@@ -1524,91 +1509,99 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
1524
1509
}
1525
1510
1526
1511
if (data ) {
1527
- ret = is_gzip_compressed (root .via .array .ptr [2 ]);
1528
- if (ret == -1 ) {
1529
- flb_plg_error (ctx -> ins , "invalid 'compressed' option" );
1530
- msgpack_unpacked_destroy (& result );
1531
- msgpack_unpacker_free (unp );
1532
- flb_sds_destroy (out_tag );
1533
- return -1 ;
1512
+ /* Get event type early for use in both compressed/uncompressed paths */
1513
+ event_type = FLB_EVENT_TYPE_LOGS ;
1514
+ if (contain_options ) {
1515
+ ret = get_chunk_event_type (ins , root .via .array .ptr [2 ]);
1516
+ if (ret == -1 ) {
1517
+ flb_plg_error (ctx -> ins , "invalid chunk event type" );
1518
+ msgpack_unpacked_destroy (& result );
1519
+ flb_sds_destroy (out_tag );
1520
+ msgpack_unpacker_free (unp );
1521
+ return -1 ;
1522
+ }
1523
+ event_type = ret ;
1534
1524
}
1535
1525
1536
- if (ret == FLB_TRUE ) {
1537
- size_t remaining = len ;
1538
-
1539
- while (remaining > 0 ) {
1540
- ret = flb_gzip_uncompress_multi ((void * ) (data + (len - remaining )), remaining ,
1541
- & gz_data , & gz_size , & remaining );
1542
-
1543
- if (ret == -1 ) {
1544
- flb_plg_error (ctx -> ins , "gzip uncompress failure" );
1526
+ /* Initialize decompressor on first compressed chunk */
1527
+ if (conn -> d_ctx == NULL && contain_options ) {
1528
+ int type = get_compression_type (root .via .array .ptr [2 ]);
1529
+ if (type > 0 ) {
1530
+ conn -> compression_type = type ;
1531
+ conn -> d_ctx = flb_decompression_context_create (
1532
+ conn -> compression_type ,
1533
+ FLB_DECOMPRESSION_BUFFER_SIZE );
1534
+ if (!conn -> d_ctx ) {
1535
+ flb_plg_error (ctx -> ins , "failed to create decompression context" );
1545
1536
msgpack_unpacked_destroy (& result );
1546
- msgpack_unpacker_free (unp );
1547
1537
flb_sds_destroy (out_tag );
1538
+ msgpack_unpacker_free (unp );
1548
1539
return -1 ;
1549
1540
}
1541
+ }
1542
+ }
1550
1543
1551
- event_type = FLB_EVENT_TYPE_LOGS ;
1552
- if (contain_options ) {
1553
- ret = get_chunk_event_type (ins , root .via .array .ptr [2 ]);
1554
- if (ret == -1 ) {
1555
- msgpack_unpacked_destroy (& result );
1556
- msgpack_unpacker_free (unp );
1557
- flb_sds_destroy (out_tag );
1558
- flb_free (gz_data );
1559
- return -1 ;
1560
- }
1561
- event_type = ret ;
1562
- }
1544
+ if (conn -> compression_type != FLB_COMPRESSION_ALGORITHM_NONE ) {
1545
+ char * decomp_buf = NULL ;
1546
+ uint8_t * append_ptr ;
1547
+ size_t available_space ;
1563
1548
1564
- ret = append_log (ins , conn ,
1565
- event_type ,
1566
- out_tag , gz_data , gz_size );
1567
- if (ret == -1 ) {
1568
- msgpack_unpacked_destroy (& result );
1569
- msgpack_unpacker_free (unp );
1570
- flb_sds_destroy (out_tag );
1571
- flb_free (gz_data );
1549
+ available_space = flb_decompression_context_get_available_space (conn -> d_ctx );
1550
+ if (len > available_space ) {
1551
+ size_t required_size = conn -> d_ctx -> input_buffer_length + len ;
1552
+ if (flb_decompression_context_resize_buffer (conn -> d_ctx , required_size ) != 0 ) {
1553
+ flb_plg_error (ctx -> ins , "cannot resize decompression buffer" );
1572
1554
return -1 ;
1573
1555
}
1574
- flb_free (gz_data );
1575
1556
}
1576
- }
1577
- else {
1578
- event_type = FLB_EVENT_TYPE_LOGS ;
1579
- if (contain_options ) {
1580
- ret = get_chunk_event_type (ins , root .via .array .ptr [2 ]);
1581
- if (ret == -1 ) {
1582
- msgpack_unpacked_destroy (& result );
1583
- msgpack_unpacker_free (unp );
1584
- flb_sds_destroy (out_tag );
1557
+ append_ptr = flb_decompression_context_get_append_buffer (conn -> d_ctx );
1558
+ memcpy (append_ptr , data , len );
1559
+ conn -> d_ctx -> input_buffer_length += len ;
1560
+
1561
+ decomp_buf = flb_malloc (ctx -> buffer_chunk_size );
1562
+ if (!decomp_buf ) {
1563
+ flb_errno ();
1564
+ return -1 ;
1565
+ }
1566
+
1567
+ do {
1568
+ size_t decomp_len = ctx -> buffer_chunk_size ;
1569
+ int decomp_ret = flb_decompress (conn -> d_ctx , decomp_buf , & decomp_len );
1570
+
1571
+ if (decomp_ret == FLB_DECOMPRESSOR_FAILURE ) {
1572
+ flb_plg_error (ctx -> ins , "decompression failed, data may be corrupt" );
1573
+ flb_free (decomp_buf );
1585
1574
return -1 ;
1586
1575
}
1587
- event_type = ret ;
1588
- }
1589
1576
1590
- ret = append_log (ins , conn ,
1591
- event_type ,
1592
- out_tag , data , len );
1593
- if (ret == -1 ) {
1594
- msgpack_unpacked_destroy (& result );
1595
- msgpack_unpacker_free (unp );
1596
- flb_sds_destroy (out_tag );
1577
+ if (decomp_len > 0 ) {
1578
+ if (append_log (ins , conn , event_type , out_tag , decomp_buf , decomp_len ) == -1 ) {
1579
+ flb_free (decomp_buf );
1580
+ return -1 ;
1581
+ }
1582
+ }
1583
+ } while (ret == 0 );
1584
+
1585
+ flb_free (decomp_buf );
1586
+ }
1587
+ else {
1588
+ if (append_log (ins , conn , event_type , out_tag , data , len ) == -1 ) {
1597
1589
return -1 ;
1598
1590
}
1599
1591
}
1592
+ }
1600
1593
1601
- /* Handle ACK response */
1602
- if (chunk_id != -1 ) {
1603
- chunk = root .via .array .ptr [2 ].via .map .ptr [chunk_id ].val ;
1604
- send_ack (ctx -> ins , conn , chunk );
1605
- }
1594
+ /* Handle ACK response (common to all paths) */
1595
+ if (chunk_id != -1 ) {
1596
+ chunk = root .via .array .ptr [2 ].via .map .ptr [chunk_id ].val ;
1597
+ send_ack (ctx -> ins , conn , chunk );
1606
1598
}
1607
1599
}
1608
1600
else {
1609
1601
flb_plg_warn (ctx -> ins , "invalid data format, type=%i" ,
1610
1602
entry .type );
1611
1603
msgpack_unpacked_destroy (& result );
1604
+ flb_sds_destroy (out_tag );
1612
1605
msgpack_unpacker_free (unp );
1613
1606
return -1 ;
1614
1607
}
0 commit comments