Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#define FLB_COMPRESSION_ALGORITHM_NONE 0
#define FLB_COMPRESSION_ALGORITHM_GZIP 1
#define FLB_COMPRESSION_ALGORITHM_ZSTD 2

#define FLB_DECOMPRESSOR_STATE_FAILED -1
#define FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER 0
Expand Down
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@

#include <fluent-bit/flb_info.h>
#include <zstd.h>
#include <zstd_errors.h>

struct flb_decompression_context;

size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *out_len);
size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t *out_len);

int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context,
void *output_buffer,
size_t *output_length);
void *flb_zstd_decompression_context_create(void);
void flb_zstd_decompression_context_destroy(void *context);
#endif
26 changes: 23 additions & 3 deletions src/flb_compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_compression.h>

static size_t flb_decompression_context_get_read_buffer_offset(
Expand Down Expand Up @@ -131,7 +132,12 @@ void flb_decompression_context_destroy(struct flb_decompression_context *context
}

if (context->inner_context != NULL) {
flb_gzip_decompression_context_destroy(context->inner_context);
if (context->algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
flb_gzip_decompression_context_destroy(context->inner_context);
}
else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
flb_zstd_decompression_context_destroy(context->inner_context);
}

context->inner_context = NULL;
}
Expand Down Expand Up @@ -178,6 +184,9 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
context->inner_context = flb_gzip_decompression_context_create();
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
context->inner_context = flb_zstd_decompression_context_create();
}
else {
flb_error("invalid compression algorithm : %d", algorithm);

Expand All @@ -197,9 +206,14 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm
}

context->input_buffer_size = input_buffer_size;
context->read_buffer = context->read_buffer;
context->read_buffer = context->input_buffer;
context->algorithm = algorithm;
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER;
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER;
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_BODY;
}

return context;
}
Expand All @@ -215,6 +229,12 @@ int flb_decompress(struct flb_decompression_context *context,
output_length);

}
else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
return flb_zstd_decompressor_dispatch(context,
output_buffer,
output_length);

}
}

return FLB_DECOMPRESSOR_FAILURE;
Expand Down
101 changes: 101 additions & 0 deletions src/flb_zstd.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include <fluent-bit/flb_compression.h>
#include <fluent-bit/flb_zstd.h>

struct flb_zstd_decompression_context {
ZSTD_DCtx *dctx;
};

#define FLB_ZSTD_DEFAULT_CHUNK 64 * 1024 /* 64 KB buffer */

Expand Down Expand Up @@ -162,3 +165,101 @@ size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t
return 0;
}

int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context,
void *output_buffer,
size_t *output_length)
{
struct flb_zstd_decompression_context *zstd_ctx;
size_t compressed_frame_size;
size_t decompressed_size;
size_t original_output_length;
size_t error_code;

if (context == NULL || context->inner_context == NULL || output_length == NULL) {
return FLB_DECOMPRESSOR_FAILURE;
}

zstd_ctx = (struct flb_zstd_decompression_context *) context->inner_context;
original_output_length = *output_length;
*output_length = 0;

if (context->input_buffer_length == 0) {
return FLB_DECOMPRESSOR_SUCCESS;
}

compressed_frame_size = ZSTD_findFrameCompressedSize(context->read_buffer,
context->input_buffer_length);

error_code = ZSTD_getErrorCode(compressed_frame_size);

/*
* Distinguish between recoverable and fatal errors.
* If we get srcSize_wrong, it just means we need more data to find the
* end of the frame. This is expected in a streaming scenario.
*/
if (error_code == ZSTD_error_srcSize_wrong) {
/* Not an error, just need more data. Return success with 0 bytes produced. */
return FLB_DECOMPRESSOR_SUCCESS;
}

/* Check for any other, truly fatal error from finding the frame. */
if (ZSTD_isError(compressed_frame_size)) {
flb_error("[zstd] frame is corrupted: %s",
ZSTD_getErrorName(compressed_frame_size));
context->state = FLB_DECOMPRESSOR_STATE_FAILED;
return FLB_DECOMPRESSOR_FAILURE;
}

/* We have a full frame. Decompress it in one shot using the robust API. */
decompressed_size = ZSTD_decompressDCtx(zstd_ctx->dctx,
output_buffer,
original_output_length,
context->read_buffer,
compressed_frame_size);

if (ZSTD_isError(decompressed_size)) {
flb_error("[zstd] decompression failed: %s",
ZSTD_getErrorName(decompressed_size));
context->state = FLB_DECOMPRESSOR_STATE_FAILED;
return FLB_DECOMPRESSOR_FAILURE;
}

/* Success. Update our pointers and report the decompressed size. */
context->read_buffer += compressed_frame_size;
context->input_buffer_length -= compressed_frame_size;
*output_length = decompressed_size;

return FLB_DECOMPRESSOR_SUCCESS;
}

void *flb_zstd_decompression_context_create(void)
{
struct flb_zstd_decompression_context *context;

context = flb_calloc(1, sizeof(struct flb_zstd_decompression_context));

if (context == NULL) {
flb_errno();
}

context->dctx = ZSTD_createDCtx();
if (context->dctx == NULL) {
flb_error("[zstd] could not create decompression context");
flb_free(context);
return NULL;
}

return (void *) context;
}

void flb_zstd_decompression_context_destroy(void *context)
{
struct flb_zstd_decompression_context *zstd_ctx = context;

if (zstd_ctx != NULL) {
if (zstd_ctx->dctx != NULL) {
ZSTD_freeDCtx(zstd_ctx->dctx);
}
flb_free(zstd_ctx);
}
}
156 changes: 156 additions & 0 deletions tests/internal/zstd.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_compression.h>
#include "flb_tests_internal.h"

/* try a small string */
Expand Down Expand Up @@ -163,12 +164,167 @@ static void test_decompress_unknown_size()
flb_free(decompressed_data);
}

static void append_to_context(struct flb_decompression_context *ctx, const void *data, size_t size)
{
uint8_t *append_ptr;
size_t available_space;

available_space = flb_decompression_context_get_available_space(ctx);

if (size > available_space) {
size_t required_size = ctx->input_buffer_length + size;
flb_decompression_context_resize_buffer(ctx, required_size);
}

/* Get pointer to the write location */
append_ptr = flb_decompression_context_get_append_buffer(ctx);
TEST_CHECK(append_ptr != NULL);

/* Copy the data */
memcpy(append_ptr, data, size);

ctx->input_buffer_length += size;
}

static void *compress_with_checksum(const void *original_data, size_t original_len,
size_t *compressed_len)
{
ZSTD_CCtx* cctx;
void *compressed_buffer;
size_t bound;
size_t ret;

/* Create a compression context */
cctx = ZSTD_createCCtx();
TEST_CHECK(cctx != NULL);

/*
* THIS IS THE KEY: Explicitly enable content checksums in the frame.
*/
ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
TEST_CHECK(!ZSTD_isError(ret));

/* Compress the data */
bound = ZSTD_compressBound(original_len);
compressed_buffer = flb_malloc(bound);
TEST_CHECK(compressed_buffer != NULL);

*compressed_len = ZSTD_compress2(cctx,
compressed_buffer, bound,
original_data, original_len);

TEST_CHECK(!ZSTD_isError(*compressed_len));

ZSTD_freeCCtx(cctx);

return compressed_buffer;
}

void test_zstd_streaming_decompress_multi_chunk(void)
{
struct flb_decompression_context *ctx;
char *output_buf;
size_t output_len;
size_t total_written = 0;
int ret;
size_t chunk1_size = 0;
size_t chunk2_size = 0;
size_t chunk3_size = 0;
char *original_text = "zstd streaming is a feature that must be tested with multiple, uneven chunks!";
size_t original_len;
void *compressed_buf = NULL;
size_t compressed_len = 0;

original_len = strlen(original_text);
compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len);
TEST_CHECK(compressed_buf != NULL);

ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len);
TEST_CHECK(ctx != NULL);
output_buf = flb_malloc(original_len + 1);

chunk1_size = compressed_len / 3;
chunk2_size = compressed_len / 2;
chunk3_size = compressed_len - chunk1_size - chunk2_size;

append_to_context(ctx, compressed_buf, chunk1_size);
output_len = original_len;
ret = flb_decompress(ctx, output_buf, &output_len);
TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS);
total_written += output_len;

append_to_context(ctx, (char *)compressed_buf + chunk1_size, chunk2_size);
output_len = original_len - total_written;
ret = flb_decompress(ctx, output_buf + total_written, &output_len);
TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS);
total_written += output_len;

append_to_context(ctx, (char *)compressed_buf + chunk1_size + chunk2_size, chunk3_size);
output_len = original_len - total_written;
ret = flb_decompress(ctx, output_buf + total_written, &output_len);
TEST_CHECK(ret == FLB_DECOMPRESSOR_SUCCESS);
total_written += output_len;

TEST_CHECK(total_written == original_len);
TEST_CHECK(memcmp(original_text, output_buf, original_len) == 0);

flb_free(compressed_buf);
flb_free(output_buf);
flb_decompression_context_destroy(ctx);
}

/* In tests/internal/zstd.c */

void test_zstd_streaming_decompress_corrupted_data(void)
{
struct flb_decompression_context *ctx;
char *output_buf;
size_t output_len;
int ret;
char *original_text = "this test ensures corrupted data with a checksum fails";
size_t original_len = strlen(original_text);
void *compressed_buf = NULL;
size_t compressed_len = 0;
char *corrupted_input;

compressed_buf = compress_with_checksum(original_text, original_len, &compressed_len);
TEST_CHECK(compressed_buf != NULL);

ctx = flb_decompression_context_create(FLB_COMPRESSION_ALGORITHM_ZSTD, compressed_len);
TEST_CHECK(ctx != NULL);

/* Create a corrupted copy of the input */
corrupted_input = flb_malloc(compressed_len);
TEST_CHECK(corrupted_input != NULL);
memcpy(corrupted_input, compressed_buf, compressed_len);
/* Corrupt a byte in the middle */
corrupted_input[compressed_len / 2]++;

append_to_context(ctx, corrupted_input, compressed_len);

output_buf = flb_malloc(original_len + 1);
output_len = original_len;

ret = flb_decompress(ctx, output_buf, &output_len);

TEST_CHECK(ret == FLB_DECOMPRESSOR_FAILURE);
TEST_CHECK(ctx->state == FLB_DECOMPRESSOR_STATE_FAILED);

flb_free(compressed_buf);
flb_free(corrupted_input);
flb_free(output_buf);
flb_decompression_context_destroy(ctx);
}


TEST_LIST = {
{ "compress_small_string", test_compress_small_string },
{ "decompress_small_string", test_decompress_small_string },
{ "compress_empty_input", test_compress_empty_input },
{ "decompress_invalid_data", test_decompress_invalid_data },
{ "compress_decompress_large_data", test_compress_decompress_large_data },
{ "decompress_unknown_size", test_decompress_unknown_size },
{ "streaming_decompress_multi_chunk", test_zstd_streaming_decompress_multi_chunk },
{ "streaming_decompress_corrupted_data", test_zstd_streaming_decompress_corrupted_data },
{ NULL, NULL }
};
Loading