diff --git a/include/fluent-bit/flb_compression.h b/include/fluent-bit/flb_compression.h index 4d8256cdad1..217f2b5cf66 100644 --- a/include/fluent-bit/flb_compression.h +++ b/include/fluent-bit/flb_compression.h @@ -26,6 +26,7 @@ #define FLB_COMPRESSION_ALGORITHM_NONE 0 #define FLB_COMPRESSION_ALGORITHM_GZIP 1 +#define FLB_COMPRESSION_ALGORITHM_ZSTD 2 #define FLB_DECOMPRESSOR_STATE_FAILED -1 #define FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER 0 diff --git a/include/fluent-bit/flb_zstd.h b/include/fluent-bit/flb_zstd.h index 7b89efa6999..c62d45c9410 100644 --- a/include/fluent-bit/flb_zstd.h +++ b/include/fluent-bit/flb_zstd.h @@ -22,8 +22,16 @@ #include #include +#include + +struct flb_decompression_context; size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *out_len); size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t *out_len); +int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context, + void *output_buffer, + size_t *output_length); +void *flb_zstd_decompression_context_create(void); +void flb_zstd_decompression_context_destroy(void *context); #endif diff --git a/src/flb_compression.c b/src/flb_compression.c index 5d6f57f9625..59d4ae7b65c 100644 --- a/src/flb_compression.c +++ b/src/flb_compression.c @@ -21,6 +21,7 @@ #include #include #include +#include #include static size_t flb_decompression_context_get_read_buffer_offset( @@ -131,7 +132,12 @@ void flb_decompression_context_destroy(struct flb_decompression_context *context } if (context->inner_context != NULL) { - flb_gzip_decompression_context_destroy(context->inner_context); + if (context->algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) { + flb_gzip_decompression_context_destroy(context->inner_context); + } + else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + flb_zstd_decompression_context_destroy(context->inner_context); + } context->inner_context = NULL; } @@ -178,6 +184,9 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) { context->inner_context = flb_gzip_decompression_context_create(); } + else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + context->inner_context = flb_zstd_decompression_context_create(); + } else { flb_error("invalid compression algorithm : %d", algorithm); @@ -197,9 +206,14 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm } context->input_buffer_size = input_buffer_size; - context->read_buffer = context->read_buffer; + context->read_buffer = context->input_buffer; context->algorithm = algorithm; - context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER; + if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) { + context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER; + } + else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_BODY; + } return context; } @@ -215,6 +229,12 @@ int flb_decompress(struct flb_decompression_context *context, output_length); } + else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) { + return flb_zstd_decompressor_dispatch(context, + output_buffer, + output_length); + + } } return FLB_DECOMPRESSOR_FAILURE; diff --git a/src/flb_gzip.c b/src/flb_gzip.c index b225db35310..fd9c0ee8675 100644 --- a/src/flb_gzip.c +++ b/src/flb_gzip.c @@ -690,9 +690,11 @@ static int flb_gzip_decompressor_process_header( /* Minimal length: header + crc32 */ if (context->input_buffer_length < FLB_GZIP_HEADER_SIZE) { - flb_error("[gzip] unexpected content length"); - - return FLB_DECOMPRESSOR_FAILURE; + /* + * This is not a fatal error; it's the expected condition when waiting + * for more data. Return INSUFFICIENT_DATA without logging an error. + */ + return FLB_DECOMPRESSOR_INSUFFICIENT_DATA; } memcpy(&inner_context->gzip_header, diff --git a/src/flb_zstd.c b/src/flb_zstd.c index aa1ff7ae2fc..43e0f3fcddb 100644 --- a/src/flb_zstd.c +++ b/src/flb_zstd.c @@ -24,6 +24,9 @@ #include #include +struct flb_zstd_decompression_context { + ZSTD_DCtx *dctx; +}; #define FLB_ZSTD_DEFAULT_CHUNK 64 * 1024 /* 64 KB buffer */ @@ -162,3 +165,102 @@ size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t return 0; } +int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context, + void *output_buffer, + size_t *output_length) +{ + struct flb_zstd_decompression_context *zstd_ctx; + size_t compressed_frame_size; + size_t decompressed_size; + size_t original_output_length; + size_t error_code; + + if (context == NULL || context->inner_context == NULL || output_length == NULL) { + return FLB_DECOMPRESSOR_FAILURE; + } + + zstd_ctx = (struct flb_zstd_decompression_context *) context->inner_context; + original_output_length = *output_length; + *output_length = 0; + + if (context->input_buffer_length == 0) { + return FLB_DECOMPRESSOR_SUCCESS; + } + + compressed_frame_size = ZSTD_findFrameCompressedSize(context->read_buffer, + context->input_buffer_length); + + error_code = ZSTD_getErrorCode(compressed_frame_size); + + /* + * Distinguish between recoverable and fatal errors. + * If we get srcSize_wrong, it just means we need more data to find the + * end of the frame. This is expected in a streaming scenario. + */ + if (error_code == ZSTD_error_srcSize_wrong) { + /* Not an error, just need more data. Return success with 0 bytes produced. */ + return FLB_DECOMPRESSOR_SUCCESS; + } + + /* Check for any other, truly fatal error from finding the frame. */ + if (ZSTD_isError(compressed_frame_size)) { + flb_error("[zstd] frame is corrupted: %s", + ZSTD_getErrorName(compressed_frame_size)); + context->state = FLB_DECOMPRESSOR_STATE_FAILED; + return FLB_DECOMPRESSOR_FAILURE; + } + + /* We have a full frame. Decompress it in one shot using the robust API. */ + decompressed_size = ZSTD_decompressDCtx(zstd_ctx->dctx, + output_buffer, + original_output_length, + context->read_buffer, + compressed_frame_size); + + if (ZSTD_isError(decompressed_size)) { + flb_error("[zstd] decompression failed: %s", + ZSTD_getErrorName(decompressed_size)); + context->state = FLB_DECOMPRESSOR_STATE_FAILED; + return FLB_DECOMPRESSOR_FAILURE; + } + + /* Success. Update our pointers and report the decompressed size. */ + context->read_buffer += compressed_frame_size; + context->input_buffer_length -= compressed_frame_size; + *output_length = decompressed_size; + + return FLB_DECOMPRESSOR_SUCCESS; +} + +void *flb_zstd_decompression_context_create(void) +{ + struct flb_zstd_decompression_context *context; + + context = flb_calloc(1, sizeof(struct flb_zstd_decompression_context)); + + if (context == NULL) { + flb_errno(); + return NULL; + } + + context->dctx = ZSTD_createDCtx(); + if (context->dctx == NULL) { + flb_error("[zstd] could not create decompression context"); + flb_free(context); + return NULL; + } + + return (void *) context; +} + +void flb_zstd_decompression_context_destroy(void *context) +{ + struct flb_zstd_decompression_context *zstd_ctx = context; + + if (zstd_ctx != NULL) { + if (zstd_ctx->dctx != NULL) { + ZSTD_freeDCtx(zstd_ctx->dctx); + } + flb_free(zstd_ctx); + } +} diff --git a/tests/internal/zstd.c b/tests/internal/zstd.c index 17efdf09e52..a8e186f09b4 100644 --- a/tests/internal/zstd.c +++ b/tests/internal/zstd.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "flb_tests_internal.h" /* try a small string */ @@ -163,6 +164,159 @@ static void test_decompress_unknown_size() flb_free(decompressed_data); } +static void append_to_context(struct flb_decompression_context *ctx, const void *data, size_t size) +{ + uint8_t *append_ptr; + size_t available_space; + + available_space = flb_decompression_context_get_available_space(ctx); + + if (size > available_space) { + size_t required_size = ctx->input_buffer_length + size; + flb_decompression_context_resize_buffer(ctx, required_size); + } + + /* Get pointer to the write location */ + append_ptr = flb_decompression_context_get_append_buffer(ctx); + TEST_CHECK(append_ptr != NULL); + + /* Copy the data */ + memcpy(append_ptr, data, size); + + ctx->input_buffer_length += size; +} + +static void *compress_with_checksum(const void *original_data, size_t original_len, + size_t *compressed_len) +{ + ZSTD_CCtx* cctx; + void *compressed_buffer; + size_t bound; + size_t ret; + + /* Create a compression context */ + cctx = ZSTD_createCCtx(); + TEST_CHECK(cctx != NULL); + + /* + * THIS IS THE KEY: Explicitly enable content checksums in the frame. + */ + ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1); + TEST_CHECK(!ZSTD_isError(ret)); + + /* Compress the data */ + bound = ZSTD_compressBound(original_len); + compressed_buffer = flb_malloc(bound); + TEST_CHECK(compressed_buffer != NULL); + + *compressed_len = ZSTD_compress2(cctx, + compressed_buffer, bound, + original_data, original_len); + + TEST_CHECK(!ZSTD_isError(*compressed_len)); + + ZSTD_freeCCtx(cctx); + + return compressed_buffer; +} + +void test_zstd_streaming_decompress_multi_chunk(void) +{ + struct flb_decompression_context *ctx; + char *output_buf; + size_t output_len; + size_t total_written = 0; + int ret; + size_t chunk1_size = 0; + size_t chunk2_size = 0; + size_t chunk3_size = 0; + char *original_text = "zstd streaming is a feature that must be tested with multiple, uneven chunks!"; + size_t original_len; + void *compressed_buf = NULL; + size_t compressed_len = 0; + + original_len = strlen(original_text); + compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len); + TEST_CHECK(compressed_buf != NULL); + + ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len); + TEST_CHECK(ctx != NULL); + output_buf = flb_malloc(original_len + 1); + + chunk1_size = compressed_len / 3; + chunk2_size = compressed_len / 2; + chunk3_size = compressed_len - chunk1_size - chunk2_size; + + append_to_context(ctx, compressed_buf, chunk1_size); + output_len = original_len; + ret = flb_decompress(ctx, output_buf, &output_len); + TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS); + total_written += output_len; + + append_to_context(ctx, (char *)compressed_buf + chunk1_size, chunk2_size); + output_len = original_len - total_written; + ret = flb_decompress(ctx, output_buf + total_written, &output_len); + TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS); + total_written += output_len; + + append_to_context(ctx, (char *)compressed_buf + chunk1_size + chunk2_size, chunk3_size); + output_len = original_len - total_written; + ret = flb_decompress(ctx, output_buf + total_written, &output_len); + TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS); + total_written += output_len; + + TEST_CHECK(total_written == original_len); + TEST_CHECK(memcmp(original_text, output_buf, original_len) == 0); + + flb_free(compressed_buf); + flb_free(output_buf); + flb_decompression_context_destroy(ctx); +} + +/* In tests/internal/zstd.c */ + +void test_zstd_streaming_decompress_corrupted_data(void) +{ + struct flb_decompression_context *ctx; + char *output_buf; + size_t output_len; + int ret; + char *original_text = "this test ensures corrupted data with a checksum fails"; + size_t original_len = strlen(original_text); + void *compressed_buf = NULL; + size_t compressed_len = 0; + char *corrupted_input; + + compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len); + TEST_CHECK(compressed_buf != NULL); + + ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len); + TEST_CHECK(ctx != NULL); + + /* Create a corrupted copy of the input */ + corrupted_input = flb_malloc(compressed_len); + TEST_CHECK(corrupted_input != NULL); + memcpy(corrupted_input, compressed_buf, compressed_len); + /* Corrupt a byte in the middle */ + corrupted_input[compressed_len / 2]++; + + append_to_context(ctx, corrupted_input, compressed_len); + + output_buf = flb_malloc(original_len + 1); + output_len = original_len; + + ret = flb_decompress(ctx, output_buf, &output_len); + + TEST_CHECK(ret == FLB_DECOMPRESSOR_FAILURE); + TEST_CHECK(ctx->state == FLB_DECOMPRESSOR_STATE_FAILED); + + flb_free(compressed_buf); + flb_free(corrupted_input); + flb_free(output_buf); + flb_decompression_context_destroy(ctx); +} + + TEST_LIST = { { "compress_small_string", test_compress_small_string }, { "decompress_small_string", test_decompress_small_string }, @@ -170,5 +324,7 @@ TEST_LIST = { { "decompress_invalid_data", test_decompress_invalid_data }, { "compress_decompress_large_data", test_compress_decompress_large_data }, { "decompress_unknown_size", test_decompress_unknown_size }, + { "streaming_decompress_multi_chunk", test_zstd_streaming_decompress_multi_chunk }, + { "streaming_decompress_corrupted_data", test_zstd_streaming_decompress_corrupted_data }, { NULL, NULL } };