Skip to content

Commit 19b7f47

Browse files
committed
in_forward: Handle with flb_compression functions to generalized both of the gzip and zstd compressions
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent fe55e04 commit 19b7f47

File tree

3 files changed

+97
-90
lines changed

3 files changed

+97
-90
lines changed

plugins/in_forward/fw_conn.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
189189
conn->buf_size = ctx->buffer_chunk_size;
190190
conn->in = ctx->ins;
191191

192+
conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE;
193+
conn->d_ctx = NULL;
194+
192195
/* Register instance into the event loop */
193196
ret = mk_event_add(flb_engine_evl_get(),
194197
connection->fd,
@@ -219,6 +222,11 @@ int fw_conn_del(struct fw_conn *conn)
219222
/* Release resources */
220223
mk_list_del(&conn->_head);
221224

225+
/* Release decompression context if it exists */
226+
if (conn->d_ctx) {
227+
flb_decompression_context_destroy(conn->d_ctx);
228+
}
229+
222230
if (conn->helo != NULL) {
223231
if (conn->helo->nonce != NULL) {
224232
flb_sds_destroy(conn->helo->nonce);

plugins/in_forward/fw_conn.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#ifndef FLB_IN_FW_CONN_H
2121
#define FLB_IN_FW_CONN_H
2222

23+
#include <fluent-bit/flb_compression.h>
24+
2325
#define FLB_IN_FW_CHUNK_SIZE "1024000" /* 1MB */
2426
#define FLB_IN_FW_CHUNK_MAX_SIZE "6144000" /* =FLB_IN_FW_CHUNK_SIZE * 6. 6MB */
2527
#define FLB_IN_FW_NONCE_SIZE 16
@@ -48,6 +50,10 @@ struct fw_conn {
4850
int buf_size; /* Buffer size */
4951
size_t rest; /* Unpacking offset */
5052

53+
/* Decompression context */
54+
int compression_type; /* e.g., FLB_COMPRESSION_ALGORITHM_GZIP */
55+
struct flb_decompression_context *d_ctx; /* Stateful decompressor context */
56+
5157
struct flb_in_fw_helo *helo; /* secure forward HELO phase */
5258

5359
struct flb_input_instance *in; /* Parent plugin instance */

plugins/in_forward/fw_prot.c

Lines changed: 83 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -90,50 +90,33 @@ static int get_chunk_event_type(struct flb_input_instance *ins, msgpack_object o
9090
return type;
9191
}
9292

93-
static int is_gzip_compressed(msgpack_object options)
93+
static int get_compression_type(msgpack_object options)
9494
{
9595
int i;
96-
msgpack_object k;
97-
msgpack_object v;
96+
msgpack_object k, v;
9897

9998
if (options.type != MSGPACK_OBJECT_MAP) {
10099
return -1;
101100
}
102101

103-
104102
for (i = 0; i < options.via.map.size; i++) {
105103
k = options.via.map.ptr[i].key;
106104
v = options.via.map.ptr[i].val;
107105

108-
if (k.type != MSGPACK_OBJECT_STR) {
109-
return -1;
110-
}
111-
112-
if (k.via.str.size != 10) {
113-
continue;
114-
}
115-
116-
if (strncmp(k.via.str.ptr, "compressed", 10) == 0) {
117-
if (v.type != MSGPACK_OBJECT_STR) {
118-
return -1;
119-
}
120-
121-
if (v.via.str.size != 4) {
122-
return -1;
123-
}
124-
125-
if (strncmp(v.via.str.ptr, "gzip", 4) == 0) {
126-
return FLB_TRUE;
127-
}
128-
else if (strncmp(v.via.str.ptr, "text", 4) == 0) {
129-
return FLB_FALSE;
106+
if (k.type == MSGPACK_OBJECT_STR && k.via.str.size == 10 &&
107+
strncmp(k.via.str.ptr, "compressed", 10) == 0) {
108+
if (v.type == MSGPACK_OBJECT_STR) {
109+
if (v.via.str.size == 4 && strncmp(v.via.str.ptr, "gzip", 4) == 0) {
110+
return FLB_COMPRESSION_ALGORITHM_GZIP;
111+
}
112+
if (v.via.str.size == 4 && strncmp(v.via.str.ptr, "zstd", 4) == 0) {
113+
return FLB_COMPRESSION_ALGORITHM_ZSTD;
114+
}
130115
}
131-
132-
return -1;
133116
}
134117
}
135118

136-
return FLB_FALSE;
119+
return FLB_COMPRESSION_ALGORITHM_NONE;
137120
}
138121

139122
static inline void print_msgpack_error_code(struct flb_input_instance *in,
@@ -1269,6 +1252,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
12691252
msgpack_unpacked result;
12701253
msgpack_unpacker *unp;
12711254
size_t all_used = 0;
1255+
const char *payload_data = NULL;
1256+
size_t payload_len = 0;
12721257
struct flb_in_fw_config *ctx = conn->ctx;
12731258

12741259
/*
@@ -1524,91 +1509,99 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
15241509
}
15251510

15261511
if (data) {
1527-
ret = is_gzip_compressed(root.via.array.ptr[2]);
1528-
if (ret == -1) {
1529-
flb_plg_error(ctx->ins, "invalid 'compressed' option");
1530-
msgpack_unpacked_destroy(&result);
1531-
msgpack_unpacker_free(unp);
1532-
flb_sds_destroy(out_tag);
1533-
return -1;
1512+
/* Get event type early for use in both compressed/uncompressed paths */
1513+
event_type = FLB_EVENT_TYPE_LOGS;
1514+
if (contain_options) {
1515+
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
1516+
if (ret == -1) {
1517+
flb_plg_error(ctx->ins, "invalid chunk event type");
1518+
msgpack_unpacked_destroy(&result);
1519+
flb_sds_destroy(out_tag);
1520+
msgpack_unpacker_free(unp);
1521+
return -1;
1522+
}
1523+
event_type = ret;
15341524
}
15351525

1536-
if (ret == FLB_TRUE) {
1537-
size_t remaining = len;
1538-
1539-
while (remaining > 0) {
1540-
ret = flb_gzip_uncompress_multi((void *) (data + (len - remaining)), remaining,
1541-
&gz_data, &gz_size, &remaining);
1542-
1543-
if (ret == -1) {
1544-
flb_plg_error(ctx->ins, "gzip uncompress failure");
1526+
/* Initialize decompressor on first compressed chunk */
1527+
if (conn->d_ctx == NULL && contain_options) {
1528+
int type = get_compression_type(root.via.array.ptr[2]);
1529+
if (type > 0) {
1530+
conn->compression_type = type;
1531+
conn->d_ctx = flb_decompression_context_create(
1532+
conn->compression_type,
1533+
FLB_DECOMPRESSION_BUFFER_SIZE);
1534+
if (!conn->d_ctx) {
1535+
flb_plg_error(ctx->ins, "failed to create decompression context");
15451536
msgpack_unpacked_destroy(&result);
1546-
msgpack_unpacker_free(unp);
15471537
flb_sds_destroy(out_tag);
1538+
msgpack_unpacker_free(unp);
15481539
return -1;
15491540
}
1541+
}
1542+
}
15501543

1551-
event_type = FLB_EVENT_TYPE_LOGS;
1552-
if (contain_options) {
1553-
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
1554-
if (ret == -1) {
1555-
msgpack_unpacked_destroy(&result);
1556-
msgpack_unpacker_free(unp);
1557-
flb_sds_destroy(out_tag);
1558-
flb_free(gz_data);
1559-
return -1;
1560-
}
1561-
event_type = ret;
1562-
}
1544+
if (conn->compression_type != FLB_COMPRESSION_ALGORITHM_NONE) {
1545+
char *decomp_buf = NULL;
1546+
uint8_t *append_ptr;
1547+
size_t available_space;
15631548

1564-
ret = append_log(ins, conn,
1565-
event_type,
1566-
out_tag, gz_data, gz_size);
1567-
if (ret == -1) {
1568-
msgpack_unpacked_destroy(&result);
1569-
msgpack_unpacker_free(unp);
1570-
flb_sds_destroy(out_tag);
1571-
flb_free(gz_data);
1549+
available_space = flb_decompression_context_get_available_space(conn->d_ctx);
1550+
if (len > available_space) {
1551+
size_t required_size = conn->d_ctx->input_buffer_length + len;
1552+
if (flb_decompression_context_resize_buffer(conn->d_ctx, required_size) != 0) {
1553+
flb_plg_error(ctx->ins, "cannot resize decompression buffer");
15721554
return -1;
15731555
}
1574-
flb_free(gz_data);
15751556
}
1576-
}
1577-
else {
1578-
event_type = FLB_EVENT_TYPE_LOGS;
1579-
if (contain_options) {
1580-
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
1581-
if (ret == -1) {
1582-
msgpack_unpacked_destroy(&result);
1583-
msgpack_unpacker_free(unp);
1584-
flb_sds_destroy(out_tag);
1557+
append_ptr = flb_decompression_context_get_append_buffer(conn->d_ctx);
1558+
memcpy(append_ptr, data, len);
1559+
conn->d_ctx->input_buffer_length += len;
1560+
1561+
decomp_buf = flb_malloc(ctx->buffer_chunk_size);
1562+
if (!decomp_buf) {
1563+
flb_errno();
1564+
return -1;
1565+
}
1566+
1567+
do {
1568+
size_t decomp_len = ctx->buffer_chunk_size;
1569+
int decomp_ret = flb_decompress(conn->d_ctx, decomp_buf, &decomp_len);
1570+
1571+
if (decomp_ret == FLB_DECOMPRESSOR_FAILURE) {
1572+
flb_plg_error(ctx->ins, "decompression failed, data may be corrupt");
1573+
flb_free(decomp_buf);
15851574
return -1;
15861575
}
1587-
event_type = ret;
1588-
}
15891576

1590-
ret = append_log(ins, conn,
1591-
event_type,
1592-
out_tag, data, len);
1593-
if (ret == -1) {
1594-
msgpack_unpacked_destroy(&result);
1595-
msgpack_unpacker_free(unp);
1596-
flb_sds_destroy(out_tag);
1577+
if (decomp_len > 0) {
1578+
if (append_log(ins, conn, event_type, out_tag, decomp_buf, decomp_len) == -1) {
1579+
flb_free(decomp_buf);
1580+
return -1;
1581+
}
1582+
}
1583+
} while (ret == 0);
1584+
1585+
flb_free(decomp_buf);
1586+
}
1587+
else {
1588+
if (append_log(ins, conn, event_type, out_tag, data, len) == -1) {
15971589
return -1;
15981590
}
15991591
}
1592+
}
16001593

1601-
/* Handle ACK response */
1602-
if (chunk_id != -1) {
1603-
chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val;
1604-
send_ack(ctx->ins, conn, chunk);
1605-
}
1594+
/* Handle ACK response (common to all paths) */
1595+
if (chunk_id != -1) {
1596+
chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val;
1597+
send_ack(ctx->ins, conn, chunk);
16061598
}
16071599
}
16081600
else {
16091601
flb_plg_warn(ctx->ins, "invalid data format, type=%i",
16101602
entry.type);
16111603
msgpack_unpacked_destroy(&result);
1604+
flb_sds_destroy(out_tag);
16121605
msgpack_unpacker_free(unp);
16131606
return -1;
16141607
}

0 commit comments

Comments
 (0)