diff --git a/include/fluent-bit/flb_compression.h b/include/fluent-bit/flb_compression.h index 4d8256cdad1..217f2b5cf66 100644 --- a/include/fluent-bit/flb_compression.h +++ b/include/fluent-bit/flb_compression.h @@ -26,6 +26,7 @@ #define FLB_COMPRESSION_ALGORITHM_NONE 0 #define FLB_COMPRESSION_ALGORITHM_GZIP 1 +#define FLB_COMPRESSION_ALGORITHM_ZSTD 2 #define FLB_DECOMPRESSOR_STATE_FAILED -1 #define FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER 0 diff --git a/include/fluent-bit/flb_zstd.h b/include/fluent-bit/flb_zstd.h index 7b89efa6999..c62d45c9410 100644 --- a/include/fluent-bit/flb_zstd.h +++ b/include/fluent-bit/flb_zstd.h @@ -22,8 +22,16 @@ #include #include +#include + +struct flb_decompression_context; size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *out_len); size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t *out_len); +int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context, + void *output_buffer, + size_t *output_length); +void *flb_zstd_decompression_context_create(void); +void flb_zstd_decompression_context_destroy(void *context); #endif diff --git a/plugins/in_forward/fw_conn.c b/plugins/in_forward/fw_conn.c index edece47d87f..d0e74d5aa43 100644 --- a/plugins/in_forward/fw_conn.c +++ b/plugins/in_forward/fw_conn.c @@ -189,6 +189,9 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_ conn->buf_size = ctx->buffer_chunk_size; conn->in = ctx->ins; + conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE; + conn->d_ctx = NULL; + /* Register instance into the event loop */ ret = mk_event_add(flb_engine_evl_get(), connection->fd, @@ -219,6 +222,11 @@ int fw_conn_del(struct fw_conn *conn) /* Release resources */ mk_list_del(&conn->_head); + /* Release decompression context if it exists */ + if (conn->d_ctx) { + flb_decompression_context_destroy(conn->d_ctx); + } + if (conn->helo != NULL) { if (conn->helo->nonce != NULL) { flb_sds_destroy(conn->helo->nonce); diff --git a/plugins/in_forward/fw_conn.h b/plugins/in_forward/fw_conn.h index 6e24c022767..380a7899192 100644 --- a/plugins/in_forward/fw_conn.h +++ b/plugins/in_forward/fw_conn.h @@ -20,6 +20,8 @@ #ifndef FLB_IN_FW_CONN_H #define FLB_IN_FW_CONN_H +#include + #define FLB_IN_FW_CHUNK_SIZE "1024000" /* 1MB */ #define FLB_IN_FW_CHUNK_MAX_SIZE "6144000" /* =FLB_IN_FW_CHUNK_SIZE * 6. 6MB */ #define FLB_IN_FW_NONCE_SIZE 16 @@ -48,6 +50,10 @@ struct fw_conn { int buf_size; /* Buffer size */ size_t rest; /* Unpacking offset */ + /* Decompression context */ + int compression_type; /* e.g., FLB_COMPRESSION_ALGORITHM_GZIP */ + struct flb_decompression_context *d_ctx; /* Stateful decompressor context */ + struct flb_in_fw_helo *helo; /* secure forward HELO phase */ struct flb_input_instance *in; /* Parent plugin instance */ diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index 4ae7636aa0d..bcaabce4623 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -90,50 +90,33 @@ static int get_chunk_event_type(struct flb_input_instance *ins, msgpack_object o return type; } -static int is_gzip_compressed(msgpack_object options) +static int get_compression_type(msgpack_object options) { int i; - msgpack_object k; - msgpack_object v; + msgpack_object k, v; if (options.type != MSGPACK_OBJECT_MAP) { return -1; } - for (i = 0; i < options.via.map.size; i++) { k = options.via.map.ptr[i].key; v = options.via.map.ptr[i].val; - if (k.type != MSGPACK_OBJECT_STR) { - return -1; - } - - if (k.via.str.size != 10) { - continue; - } - - if (strncmp(k.via.str.ptr, "compressed", 10) == 0) { - if (v.type != MSGPACK_OBJECT_STR) { - return -1; - } - - if (v.via.str.size != 4) { - return -1; - } - - if (strncmp(v.via.str.ptr, "gzip", 4) == 0) { - return FLB_TRUE; - } - else if (strncmp(v.via.str.ptr, "text", 4) == 0) { - return FLB_FALSE; + if (k.type == MSGPACK_OBJECT_STR && k.via.str.size == 10 && + strncmp(k.via.str.ptr, "compressed", 10) == 0) { + if (v.type == MSGPACK_OBJECT_STR) { + if (v.via.str.size == 4 && strncmp(v.via.str.ptr, "gzip", 4) == 0) { + return FLB_COMPRESSION_ALGORITHM_GZIP; + } + if (v.via.str.size == 4 && strncmp(v.via.str.ptr, "zstd", 4) == 0) { + return FLB_COMPRESSION_ALGORITHM_ZSTD; + } } - - return -1; } } - return FLB_FALSE; + return FLB_COMPRESSION_ALGORITHM_NONE; } static inline void print_msgpack_error_code(struct flb_input_instance *in, @@ -1269,6 +1252,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) msgpack_unpacked result; msgpack_unpacker *unp; size_t all_used = 0; + const char *payload_data = NULL; + size_t payload_len = 0; struct flb_in_fw_config *ctx = conn->ctx; /* @@ -1524,91 +1509,115 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } if (data) { - ret = is_gzip_compressed(root.via.array.ptr[2]); - if (ret == -1) { - flb_plg_error(ctx->ins, "invalid 'compressed' option"); - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; + /* Get event type early for use in both compressed/uncompressed paths */ + event_type = FLB_EVENT_TYPE_LOGS; + if (contain_options) { + ret = get_chunk_event_type(ins, root.via.array.ptr[2]); + if (ret == -1) { + flb_plg_error(ctx->ins, "invalid chunk event type"); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(out_tag); + msgpack_unpacker_free(unp); + return -1; + } + event_type = ret; } - if (ret == FLB_TRUE) { - size_t remaining = len; - - while (remaining > 0) { - ret = flb_gzip_uncompress_multi((void *) (data + (len - remaining)), remaining, - &gz_data, &gz_size, &remaining); - - if (ret == -1) { - flb_plg_error(ctx->ins, "gzip uncompress failure"); + /* Initialize decompressor on first compressed chunk */ + if (conn->d_ctx == NULL && contain_options) { + int type = get_compression_type(root.via.array.ptr[2]); + if (type > 0) { + conn->compression_type = type; + conn->d_ctx = flb_decompression_context_create( + conn->compression_type, + FLB_DECOMPRESSION_BUFFER_SIZE); + if (!conn->d_ctx) { + flb_plg_error(ctx->ins, "failed to create decompression context"); msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); + msgpack_unpacker_free(unp); + return -1; + } + } + } + + if (conn->compression_type != FLB_COMPRESSION_ALGORITHM_NONE) { + char *decomp_buf = NULL; + uint8_t *append_ptr; + size_t available_space; + size_t decomp_len; + int decomp_ret; + size_t required_size; + + available_space = flb_decompression_context_get_available_space(conn->d_ctx); + if (len > available_space) { + required_size = conn->d_ctx->input_buffer_length + len; + if (flb_decompression_context_resize_buffer(conn->d_ctx, required_size) != 0) { + flb_plg_error(ctx->ins, "cannot resize decompression buffer"); return -1; } + } + append_ptr = flb_decompression_context_get_append_buffer(conn->d_ctx); + memcpy(append_ptr, data, len); + conn->d_ctx->input_buffer_length += len; - event_type = FLB_EVENT_TYPE_LOGS; - if (contain_options) { - ret = get_chunk_event_type(ins, root.via.array.ptr[2]); - if (ret == -1) { + decomp_buf = flb_malloc(ctx->buffer_chunk_size); + if (!decomp_buf) { + flb_errno(); + return -1; + } + + do { + decomp_len = ctx->buffer_chunk_size; + decomp_ret = flb_decompress(conn->d_ctx, decomp_buf, &decomp_len); + + if (decomp_ret == FLB_DECOMPRESSOR_FAILURE) { + if (decomp_len > 0) { + flb_plg_error(ctx->ins, "decompression failed, data may be corrupt"); + flb_free(decomp_buf); msgpack_unpacked_destroy(&result); msgpack_unpacker_free(unp); flb_sds_destroy(out_tag); - flb_free(gz_data); return -1; } - event_type = ret; + break; } - ret = append_log(ins, conn, - event_type, - out_tag, gz_data, gz_size); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - flb_free(gz_data); - return -1; + if (decomp_len > 0) { + if (append_log(ins, conn, event_type, out_tag, decomp_buf, decomp_len) == -1) { + flb_free(decomp_buf); + msgpack_unpacked_destroy(&result); + msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); + return -1; + } } - flb_free(gz_data); - } + } while (decomp_len > 0); + + flb_free(decomp_buf); + + flb_decompression_context_destroy(conn->d_ctx); + conn->d_ctx = NULL; + conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE; } else { - event_type = FLB_EVENT_TYPE_LOGS; - if (contain_options) { - ret = get_chunk_event_type(ins, root.via.array.ptr[2]); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); - return -1; - } - event_type = ret; - } - - ret = append_log(ins, conn, - event_type, - out_tag, data, len); - if (ret == -1) { - msgpack_unpacked_destroy(&result); - msgpack_unpacker_free(unp); - flb_sds_destroy(out_tag); + if (append_log(ins, conn, event_type, out_tag, data, len) == -1) { return -1; } } + } - /* Handle ACK response */ - if (chunk_id != -1) { - chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val; - send_ack(ctx->ins, conn, chunk); - } + /* Handle ACK response (common to all paths) */ + if (chunk_id != -1) { + chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val; + send_ack(ctx->ins, conn, chunk); } } else { flb_plg_warn(ctx->ins, "invalid data format, type=%i", entry.type); msgpack_unpacked_destroy(&result); + flb_sds_destroy(out_tag); msgpack_unpacker_free(unp); return -1; } diff --git a/src/flb_compression.c b/src/flb_compression.c index 5d6f57f9625..59d4ae7b65c 100644 --- a/src/flb_compression.c +++ b/src/flb_compression.c @@ -21,6 +21,7 @@ #include #include #include +#include #include static size_t flb_decompression_context_get_read_buffer_offset( @@ -131,7 +132,12 @@ void flb_decompression_context_destroy(struct flb_decompression_context *context } if (context->inner_context != NULL) { - flb_gzip_decompression_context_destroy(context->inner_context); + if (context->algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) { + flb_gzip_decompression_context_destroy(context->inner_context); + } + else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + flb_zstd_decompression_context_destroy(context->inner_context); + } context->inner_context = NULL; } @@ -178,6 +184,9 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) { context->inner_context = flb_gzip_decompression_context_create(); } + else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + context->inner_context = flb_zstd_decompression_context_create(); + } else { flb_error("invalid compression algorithm : %d", algorithm); @@ -197,9 +206,14 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm } context->input_buffer_size = input_buffer_size; - context->read_buffer = context->read_buffer; + context->read_buffer = context->input_buffer; context->algorithm = algorithm; - context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER; + if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) { + context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER; + } + else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_BODY; + } return context; } @@ -215,6 +229,12 @@ int flb_decompress(struct flb_decompression_context *context, output_length); } + else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + return flb_zstd_decompressor_dispatch(context, + output_buffer, + output_length); + + } } return FLB_DECOMPRESSOR_FAILURE; diff --git a/src/flb_gzip.c b/src/flb_gzip.c index b225db35310..fd9c0ee8675 100644 --- a/src/flb_gzip.c +++ b/src/flb_gzip.c @@ -690,9 +690,11 @@ static int flb_gzip_decompressor_process_header( /* Minimal length: header + crc32 */ if (context->input_buffer_length < FLB_GZIP_HEADER_SIZE) { - flb_error("[gzip] unexpected content length"); - - return FLB_DECOMPRESSOR_FAILURE; + /* + * This is not a fatal error; it's the expected condition when waiting + * for more data. Return INSUFFICIENT_DATA without logging an error. + */ + return FLB_DECOMPRESSOR_INSUFFICIENT_DATA; } memcpy(&inner_context->gzip_header, diff --git a/src/flb_zstd.c b/src/flb_zstd.c index aa1ff7ae2fc..43e0f3fcddb 100644 --- a/src/flb_zstd.c +++ b/src/flb_zstd.c @@ -24,6 +24,9 @@ #include #include +struct flb_zstd_decompression_context { + ZSTD_DCtx *dctx; +}; #define FLB_ZSTD_DEFAULT_CHUNK 64 * 1024 /* 64 KB buffer */ @@ -162,3 +165,102 @@ size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t return 0; } +int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context, + void *output_buffer, + size_t *output_length) +{ + struct flb_zstd_decompression_context *zstd_ctx; + size_t compressed_frame_size; + size_t decompressed_size; + size_t original_output_length; + size_t error_code; + + if (context == NULL || context->inner_context == NULL || output_length == NULL) { + return FLB_DECOMPRESSOR_FAILURE; + } + + zstd_ctx = (struct flb_zstd_decompression_context *) context->inner_context; + original_output_length = *output_length; + *output_length = 0; + + if (context->input_buffer_length == 0) { + return FLB_DECOMPRESSOR_SUCCESS; + } + + compressed_frame_size = ZSTD_findFrameCompressedSize(context->read_buffer, + context->input_buffer_length); + + error_code = ZSTD_getErrorCode(compressed_frame_size); + + /* + * Distinguish between recoverable and fatal errors. + * If we get srcSize_wrong, it just means we need more data to find the + * end of the frame. This is expected in a streaming scenario. + */ + if (error_code == ZSTD_error_srcSize_wrong) { + /* Not an error, just need more data. Return success with 0 bytes produced. */ + return FLB_DECOMPRESSOR_SUCCESS; + } + + /* Check for any other, truly fatal error from finding the frame. */ + if (ZSTD_isError(compressed_frame_size)) { + flb_error("[zstd] frame is corrupted: %s", + ZSTD_getErrorName(compressed_frame_size)); + context->state = FLB_DECOMPRESSOR_STATE_FAILED; + return FLB_DECOMPRESSOR_FAILURE; + } + + /* We have a full frame. Decompress it in one shot using the robust API. */ + decompressed_size = ZSTD_decompressDCtx(zstd_ctx->dctx, + output_buffer, + original_output_length, + context->read_buffer, + compressed_frame_size); + + if (ZSTD_isError(decompressed_size)) { + flb_error("[zstd] decompression failed: %s", + ZSTD_getErrorName(decompressed_size)); + context->state = FLB_DECOMPRESSOR_STATE_FAILED; + return FLB_DECOMPRESSOR_FAILURE; + } + + /* Success. Update our pointers and report the decompressed size. */ + context->read_buffer += compressed_frame_size; + context->input_buffer_length -= compressed_frame_size; + *output_length = decompressed_size; + + return FLB_DECOMPRESSOR_SUCCESS; +} + +void *flb_zstd_decompression_context_create(void) +{ + struct flb_zstd_decompression_context *context; + + context = flb_calloc(1, sizeof(struct flb_zstd_decompression_context)); + + if (context == NULL) { + flb_errno(); + return NULL; + } + + context->dctx = ZSTD_createDCtx(); + if (context->dctx == NULL) { + flb_error("[zstd] could not create decompression context"); + flb_free(context); + return NULL; + } + + return (void *) context; +} + +void flb_zstd_decompression_context_destroy(void *context) +{ + struct flb_zstd_decompression_context *zstd_ctx = context; + + if (zstd_ctx != NULL) { + if (zstd_ctx->dctx != NULL) { + ZSTD_freeDCtx(zstd_ctx->dctx); + } + flb_free(zstd_ctx); + } +} diff --git a/tests/internal/zstd.c b/tests/internal/zstd.c index 17efdf09e52..a8e186f09b4 100644 --- a/tests/internal/zstd.c +++ b/tests/internal/zstd.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "flb_tests_internal.h" /* try a small string */ @@ -163,6 +164,159 @@ static void test_decompress_unknown_size() flb_free(decompressed_data); } +static void append_to_context(struct flb_decompression_context *ctx, const void *data, size_t size) +{ + uint8_t *append_ptr; + size_t available_space; + + available_space = flb_decompression_context_get_available_space(ctx); + + if (size > available_space) { + size_t required_size = ctx->input_buffer_length + size; + flb_decompression_context_resize_buffer(ctx, required_size); + } + + /* Get pointer to the write location */ + append_ptr = flb_decompression_context_get_append_buffer(ctx); + TEST_CHECK(append_ptr != NULL); + + /* Copy the data */ + memcpy(append_ptr, data, size); + + ctx->input_buffer_length += size; +} + +static void *compress_with_checksum(const void *original_data, size_t original_len, + size_t *compressed_len) +{ + ZSTD_CCtx* cctx; + void *compressed_buffer; + size_t bound; + size_t ret; + + /* Create a compression context */ + cctx = ZSTD_createCCtx(); + TEST_CHECK(cctx != NULL); + + /* + * THIS IS THE KEY: Explicitly enable content checksums in the frame. + */ + ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1); + TEST_CHECK(!ZSTD_isError(ret)); + + /* Compress the data */ + bound = ZSTD_compressBound(original_len); + compressed_buffer = flb_malloc(bound); + TEST_CHECK(compressed_buffer != NULL); + + *compressed_len = ZSTD_compress2(cctx, + compressed_buffer, bound, + original_data, original_len); + + TEST_CHECK(!ZSTD_isError(*compressed_len)); + + ZSTD_freeCCtx(cctx); + + return compressed_buffer; +} + +void test_zstd_streaming_decompress_multi_chunk(void) +{ + struct flb_decompression_context *ctx; + char *output_buf; + size_t output_len; + size_t total_written = 0; + int ret; + size_t chunk1_size = 0; + size_t chunk2_size = 0; + size_t chunk3_size = 0; + char *original_text = "zstd streaming is a feature that must be tested with multiple, uneven chunks!"; + size_t original_len; + void *compressed_buf = NULL; + size_t compressed_len = 0; + + original_len = strlen(original_text); + compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len); + TEST_CHECK(compressed_buf != NULL); + + ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len); + TEST_CHECK(ctx != NULL); + output_buf = flb_malloc(original_len + 1); + + chunk1_size = compressed_len / 3; + chunk2_size = compressed_len / 2; + chunk3_size = compressed_len - chunk1_size - chunk2_size; + + append_to_context(ctx, compressed_buf, chunk1_size); + output_len = original_len; + ret = flb_decompress(ctx, output_buf, &output_len); + TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS); + total_written += output_len; + + append_to_context(ctx, (char *)compressed_buf + chunk1_size, chunk2_size); + output_len = original_len - total_written; + ret = flb_decompress(ctx, output_buf + total_written, &output_len); + TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS); + total_written += output_len; + + append_to_context(ctx, (char *)compressed_buf + chunk1_size + chunk2_size, chunk3_size); + output_len = original_len - total_written; + ret = flb_decompress(ctx, output_buf + total_written, &output_len); + TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS); + total_written += output_len; + + TEST_CHECK(total_written == original_len); + TEST_CHECK(memcmp(original_text, output_buf, original_len) == 0); + + flb_free(compressed_buf); + flb_free(output_buf); + flb_decompression_context_destroy(ctx); +} + +/* In tests/internal/zstd.c */ + +void test_zstd_streaming_decompress_corrupted_data(void) +{ + struct flb_decompression_context *ctx; + char *output_buf; + size_t output_len; + int ret; + char *original_text = "this test ensures corrupted data with a checksum fails"; + size_t original_len = strlen(original_text); + void *compressed_buf = NULL; + size_t compressed_len = 0; + char *corrupted_input; + + compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len); + TEST_CHECK(compressed_buf != NULL); + + ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len); + TEST_CHECK(ctx != NULL); + + /* Create a corrupted copy of the input */ + corrupted_input = flb_malloc(compressed_len); + TEST_CHECK(corrupted_input != NULL); + memcpy(corrupted_input, compressed_buf, compressed_len); + /* Corrupt a byte in the middle */ + corrupted_input[compressed_len / 2]++; + + append_to_context(ctx, corrupted_input, compressed_len); + + output_buf = flb_malloc(original_len + 1); + output_len = original_len; + + ret = flb_decompress(ctx, output_buf, &output_len); + + TEST_CHECK(ret == FLB_DECOMPRESSOR_FAILURE); + TEST_CHECK(ctx->state == FLB_DECOMPRESSOR_STATE_FAILED); + + flb_free(compressed_buf); + flb_free(corrupted_input); + flb_free(output_buf); + flb_decompression_context_destroy(ctx); +} + + TEST_LIST = { { "compress_small_string", test_compress_small_string }, { "decompress_small_string", test_decompress_small_string }, @@ -170,5 +324,7 @@ TEST_LIST = { { "decompress_invalid_data", test_decompress_invalid_data }, { "compress_decompress_large_data", test_compress_decompress_large_data }, { "decompress_unknown_size", test_decompress_unknown_size }, + { "streaming_decompress_multi_chunk", test_zstd_streaming_decompress_multi_chunk }, + { "streaming_decompress_corrupted_data", test_zstd_streaming_decompress_corrupted_data }, { NULL, NULL } }; diff --git a/tests/runtime/in_forward.c b/tests/runtime/in_forward.c index 6cabfa94a95..7577bae01ac 100644 --- a/tests/runtime/in_forward.c +++ b/tests/runtime/in_forward.c @@ -23,6 +23,9 @@ #include #include #include +#include +#include +#include #include #include #ifdef FLB_HAVE_UNIX_SOCKET @@ -565,6 +568,228 @@ void flb_test_unix_perm() } #endif /* FLB_HAVE_UNIX_SOCKET */ +/* + * Creates a forward-protocol-compliant, Gzip-compressed MessagePack payload. + * The final structure is: [tag, compressed_events, {options}] + */ +static int create_simple_json_gzip(msgpack_sbuffer *sbuf) +{ + int ret; + char *event_buf; + size_t event_size; + char *compressed_buf; + size_t compressed_size; + int root_type; + msgpack_packer pck; + + char *tag = "test"; + char event_json[] = "[1234567890,{\"test\":\"msg\"}]"; + + ret = flb_pack_json(event_json, strlen(event_json), + &event_buf, &event_size, &root_type, NULL); + if (!TEST_CHECK(ret == 0)) { + return -1; + } + + ret = flb_gzip_compress(event_buf, event_size, + (void **)&compressed_buf, &compressed_size); + if (!TEST_CHECK(ret == 0)) { + flb_free(event_buf); + return -1; + } + flb_free(event_buf); + + /* Create temporary msgpack buffer */ + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&pck, 3); + msgpack_pack_str_with_body(&pck, tag, strlen(tag)); + msgpack_pack_bin_with_body(&pck, compressed_buf, compressed_size); + msgpack_pack_map(&pck, 2); + msgpack_pack_str_with_body(&pck, "compressed", 10); + msgpack_pack_str_with_body(&pck, "gzip", 4); + msgpack_pack_str_with_body(&pck, "size", 4); + msgpack_pack_uint64(&pck, event_size); + + flb_free(compressed_buf); + + return 0; +} + +void flb_test_forward_gzip() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + char *buf; + size_t size; + + msgpack_sbuffer sbuf; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "test", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_init(&sbuf); + create_simple_json_gzip(&sbuf); + + w_size = send(fd, sbuf.data, sbuf.size, 0); + if (!TEST_CHECK(w_size == sbuf.size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + msgpack_sbuffer_destroy(&sbuf); + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_destroy(&sbuf); + + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + +/* + * Creates a forward-protocol-compliant, Zstd-compressed MessagePack payload. + * The final structure is: [tag, compressed_events, {options}] + */ +static int create_simple_json_zstd(msgpack_sbuffer *sbuf) +{ + int ret; + char *event_buf; + size_t event_size; + char *compressed_buf; + size_t compressed_size; + int root_type; + msgpack_packer pck; + + char *tag = "test"; + char event_json[] = "[1234567890,{\"test\":\"msg\"}]"; + + ret = flb_pack_json(event_json, strlen(event_json), + &event_buf, &event_size, &root_type, NULL); + if (!TEST_CHECK(ret == 0)) { + return -1; + } + + ret = flb_zstd_compress(event_buf, event_size, + (void **)&compressed_buf, &compressed_size); + if (!TEST_CHECK(ret == 0)) { + flb_free(event_buf); + return -1; + } + flb_free(event_buf); + + /* Create temporary msgpack buffer */ + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + + msgpack_pack_array(&pck, 3); + msgpack_pack_str_with_body(&pck, tag, strlen(tag)); + msgpack_pack_bin_with_body(&pck, compressed_buf, compressed_size); + msgpack_pack_map(&pck, 2); + msgpack_pack_str_with_body(&pck, "compressed", 10); + msgpack_pack_str_with_body(&pck, "zstd", 4); + msgpack_pack_str_with_body(&pck, "size", 4); + msgpack_pack_uint64(&pck, event_size); + + flb_free(compressed_buf); + + return 0; +} + +void flb_test_forward_zstd() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + char *buf; + size_t size; + + msgpack_sbuffer sbuf; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"test\":\"msg\""; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "test", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_init(&sbuf); + create_simple_json_zstd(&sbuf); + + w_size = send(fd, sbuf.data, sbuf.size, 0); + if (!TEST_CHECK(w_size == sbuf.size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + msgpack_sbuffer_destroy(&sbuf); + exit(EXIT_FAILURE); + } + + msgpack_sbuffer_destroy(&sbuf); + + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"forward", flb_test_forward}, @@ -574,6 +799,7 @@ TEST_LIST = { {"unix_path", flb_test_unix_path}, {"unix_perm", flb_test_unix_perm}, #endif + {"forward_gzip", flb_test_forward_gzip}, + {"forward_zstd", flb_test_forward_zstd}, {NULL, NULL} }; -