From b9706e4cfb0a61f149d72a93c4a9b4f823379258 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 14:56:31 +0900 Subject: [PATCH 01/12] build: Check existence of arrow-glib-parquet library Signed-off-by: Hiroshi Hatake --- CMakeLists.txt | 8 ++++++++ src/aws/compression/arrow/CMakeLists.txt | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5d9e7c879c9..093ff0bd1fb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1232,6 +1232,14 @@ else() set(FLB_ARROW OFF) endif() +# Additional prerequisites for Apache Parquet +pkg_check_modules(ARROW_GLIB_PARQUET QUIET parquet-glib) +if(FLB_ARROW AND ARROW_GLIB_PARQUET_FOUND) + FLB_DEFINITION(FLB_HAVE_ARROW_PARQUET) +else() + message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression for AWS module") +endif() + # EBPF Support # ============ if (FLB_IN_EBPF) diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt index 846f654412d..dd384f8dc8a 100644 --- a/src/aws/compression/arrow/CMakeLists.txt +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -5,3 +5,7 @@ add_library(flb-aws-arrow STATIC ${src}) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) +if (ARROW_GLIB_PARQUET_FOUND) +target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_PARQUET_INCLUDE_DIRS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LDFLAGS}) +endif() From d5fc9e26394506e2b39d41241da2aefca0159197 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 15:02:02 +0900 Subject: [PATCH 02/12] aws: compression: Implement compression of Apache Arrow Parquet Signed-off-by: Hiroshi Hatake --- src/aws/compression/arrow/compress.c | 123 +++++++++++++++++++++++++++ src/aws/compression/arrow/compress.h | 15 ++++ 2 files changed, 138 insertions(+) diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c index a48b34f8096..6deca8c91fa 100644 --- a/src/aws/compression/arrow/compress.c +++ b/src/aws/compression/arrow/compress.c @@ -8,6 +8,9 @@ */ #include +#ifdef FLB_HAVE_ARROW_PARQUET +#include +#endif #include /* @@ -145,3 +148,123 @@ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_s g_bytes_unref(bytes); return 0; } + +#ifdef FLB_HAVE_ARROW_PARQUET +static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) +{ + GArrowResizableBuffer *buffer; + GArrowBufferOutputStream *sink; + GParquetArrowFileWriter *writer; + GArrowSchema *schema; + GError *error = NULL; + gboolean success; + gint64 n_rows = 0; + + buffer = garrow_resizable_buffer_new(0, &error); + if (buffer == NULL) { + g_error_free(error); + return NULL; + } + + sink = garrow_buffer_output_stream_new(buffer); + if (sink == NULL) { + g_object_unref(buffer); + return NULL; + } + + schema = garrow_table_get_schema(table); + if (schema == NULL) { + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + + /* Create a new Parquet file writer */ + writer = gparquet_arrow_file_writer_new_arrow(schema, + GARROW_OUTPUT_STREAM(sink), + NULL, /* Arrow writer properties */ + &error); + g_object_unref(schema); + if (writer == NULL) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + return NULL; + } + + n_rows = garrow_table_get_n_rows(table); + + /* Write the entire table to the Parquet file buffer */ + success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + g_object_unref(writer); + return NULL; + } + + /* Close the writer to finalize the Parquet file metadata */ + success = gparquet_arrow_file_writer_close(writer, &error); + if (!success) { + g_error_free(error); + g_object_unref(buffer); + g_object_unref(sink); + g_object_unref(writer); + return NULL; + } + + g_object_unref(sink); + g_object_unref(writer); + return buffer; +} + + +int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size) +{ + GArrowTable *table; + GArrowResizableBuffer *buffer; + GBytes *bytes; + gconstpointer ptr; + gsize len; + uint8_t *buf; + + table = parse_json((uint8_t *) json, size); + if (table == NULL) { + return -1; + } + + buffer = table_to_parquet_buffer(table); + g_object_unref(table); + if (buffer == NULL) { + return -1; + } + + bytes = garrow_buffer_get_data(GARROW_BUFFER(buffer)); + if (bytes == NULL) { + g_object_unref(buffer); + return -1; + } + + ptr = g_bytes_get_data(bytes, &len); + if (ptr == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + + buf = malloc(len); + if (buf == NULL) { + g_object_unref(buffer); + g_bytes_unref(bytes); + return -1; + } + memcpy(buf, ptr, len); + *out_buf = (void *) buf; + *out_size = len; + + g_object_unref(buffer); + g_bytes_unref(bytes); + return 0; +} +#endif diff --git a/src/aws/compression/arrow/compress.h b/src/aws/compression/arrow/compress.h index 82e94f43cee..f8dcd4a4248 100644 --- a/src/aws/compression/arrow/compress.h +++ b/src/aws/compression/arrow/compress.h @@ -11,3 +11,18 @@ */ int out_s3_compress_arrow(void *json, size_t size, void **out_buf, size_t *out_size); + +#ifdef FLB_HAVE_ARROW_PARQUET +/* + * This function converts out_s3 buffer into Apache Parquet format. + * + * `json` is a string that contain (concatenated) JSON objects. + * + * `size` is the length of the json data (excluding the trailing + * null-terminator character). + * + * Return 0 on success (with `out_buf` and `out_size` updated), + * and -1 on failure + */ +int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out_size); +#endif From 833f5b100a58ffcef0b80f23eae3db80aace514e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 15:07:49 +0900 Subject: [PATCH 03/12] aws: Implement a handler for parquet compression Signed-off-by: Hiroshi Hatake --- include/fluent-bit/aws/flb_aws_compress.h | 7 ++++--- src/aws/flb_aws_compress.c | 7 +++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/include/fluent-bit/aws/flb_aws_compress.h b/include/fluent-bit/aws/flb_aws_compress.h index e1cf9222377..d9e929c6669 100644 --- a/include/fluent-bit/aws/flb_aws_compress.h +++ b/include/fluent-bit/aws/flb_aws_compress.h @@ -21,9 +21,10 @@ #define FLB_AWS_COMPRESS #include -#define FLB_AWS_COMPRESS_NONE 0 -#define FLB_AWS_COMPRESS_GZIP 1 -#define FLB_AWS_COMPRESS_ARROW 2 +#define FLB_AWS_COMPRESS_NONE 0 +#define FLB_AWS_COMPRESS_GZIP 1 +#define FLB_AWS_COMPRESS_ARROW 2 +#define FLB_AWS_COMPRESS_PARQUET 3 /* * Get compression type from compression keyword. The return value is used to identify diff --git a/src/aws/flb_aws_compress.c b/src/aws/flb_aws_compress.c index a06d181193f..253020e392a 100644 --- a/src/aws/flb_aws_compress.c +++ b/src/aws/flb_aws_compress.c @@ -54,6 +54,13 @@ static const struct compression_option compression_options[] = { "arrow", &out_s3_compress_arrow }, +#endif +#ifdef FLB_HAVE_ARROW_PARQUET + { + FLB_AWS_COMPRESS_PARQUET, + "parquet", + &out_s3_compress_parquet + }, #endif { 0 } }; From 6d78490ffc3111a186dbacd700eb5d4cdeb2fcb1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 15:25:19 +0900 Subject: [PATCH 04/12] out_s3: Use arrow or parquet correctly as compression methods Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index af3c9c37bea..a1a8484181b 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -703,9 +703,11 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "unknown compression: %s", tmp); return -1; } - if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_ARROW) { + if (ctx->use_put_object == FLB_FALSE && + (ctx->compression == FLB_AWS_COMPRESS_ARROW || + ctx->compression == FLB_AWS_COMPRESS_PARQUET)) { flb_plg_error(ctx->ins, - "use_put_object must be enabled when Apache Arrow is enabled"); + "use_put_object must be enabled when Apache Arrow or Parquet is enabled"); return -1; } ctx->compression = ret; @@ -730,7 +732,7 @@ static int cb_s3_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "upload_chunk_size must be at least 5,242,880 bytes"); return -1; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { if(ctx->upload_chunk_size > MAX_CHUNKED_UPLOAD_COMPRESS_SIZE) { flb_plg_error(ctx->ins, "upload_chunk_size in compressed multipart upload cannot exceed 5GB"); return -1; @@ -1125,7 +1127,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, file_first_log_time = chunk->first_log_time; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { /* Map payload */ ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); if (ret == -1) { @@ -1168,7 +1170,9 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, goto multipart; } else { - if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->use_put_object == FLB_FALSE && + (ctx->compression == FLB_AWS_COMPRESS_ARROW || + ctx->compression == FLB_AWS_COMPRESS_PARQUET)) { flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, " "the chunk was too small, using PutObject to upload", preCompress_size, body_size); } @@ -1190,7 +1194,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, * remove chunk from buffer list */ ret = s3_put_object(ctx, tag, file_first_log_time, body, body_size); - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } if (ret < 0) { @@ -1217,7 +1221,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } return FLB_RETRY; @@ -1231,7 +1235,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, if (chunk) { s3_store_file_unlock(chunk); } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } return FLB_RETRY; @@ -1241,7 +1245,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = upload_part(ctx, m_upload, body, body_size, NULL); if (ret < 0) { - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } m_upload->upload_errors += 1; @@ -1258,7 +1262,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, s3_store_file_delete(ctx, chunk); chunk = NULL; } - if (ctx->compression == FLB_AWS_COMPRESS_GZIP) { + if (ctx->compression != FLB_AWS_COMPRESS_NONE) { flb_free(payload_buf); } if (m_upload->bytes >= ctx->file_size) { @@ -3991,8 +3995,8 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "compression", NULL, 0, FLB_FALSE, 0, - "Compression type for S3 objects. 'gzip' and 'arrow' are the supported values. " - "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Compression type for S3 objects. 'gzip', 'arrow' and 'parquet' are the supported values. " + "'arrow' and 'parquet' are only available if Apache Arrow was enabled at compile time. " "Defaults to no compression. " "If 'gzip' is selected, the Content-Encoding HTTP Header will be set to 'gzip'." }, From 974e9075e1067b8f6c4f5d2710f095a2cfb1fb26 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 17:14:38 +0900 Subject: [PATCH 05/12] aws: compress: Add error logs Signed-off-by: Hiroshi Hatake --- src/aws/compression/arrow/compress.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c index 6deca8c91fa..8b91a715750 100644 --- a/src/aws/compression/arrow/compress.c +++ b/src/aws/compression/arrow/compress.c @@ -11,6 +11,7 @@ #ifdef FLB_HAVE_ARROW_PARQUET #include #endif +#include #include /* @@ -186,6 +187,7 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) &error); g_object_unref(schema); if (writer == NULL) { + flb_error("[aws][compress] Failed to create parquet writer: %s", error->message); g_error_free(error); g_object_unref(buffer); g_object_unref(sink); @@ -197,6 +199,7 @@ static GArrowResizableBuffer* table_to_parquet_buffer(GArrowTable *table) /* Write the entire table to the Parquet file buffer */ success = gparquet_arrow_file_writer_write_table(writer, table, n_rows, &error); if (!success) { + flb_error("[aws][compress] Failed to write table to parquet buffer: %s", error->message); g_error_free(error); g_object_unref(buffer); g_object_unref(sink); @@ -231,12 +234,14 @@ int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out table = parse_json((uint8_t *) json, size); if (table == NULL) { + flb_error("[aws][compress] Failed to parse JSON into Arrow Table for Parquet conversion"); return -1; } buffer = table_to_parquet_buffer(table); g_object_unref(table); if (buffer == NULL) { + flb_error("[aws][compress] Failed to convert Arrow Table into Parquet buffer"); return -1; } From b0b9586e45c31c2a9e5160966f3c906844d9f2e1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 5 Aug 2025 18:46:19 +0900 Subject: [PATCH 06/12] build: aws: compress: Use LIBRARIES instead of LDFLAGS for safety Signed-off-by: Hiroshi Hatake --- src/aws/compression/arrow/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt index dd384f8dc8a..6448a971884 100644 --- a/src/aws/compression/arrow/CMakeLists.txt +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -4,8 +4,8 @@ set(src add_library(flb-aws-arrow STATIC ${src}) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) -target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LIBRARIES}) if (ARROW_GLIB_PARQUET_FOUND) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_PARQUET_INCLUDE_DIRS}) -target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LDFLAGS}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LIBRARIES}) endif() From 98b047f52daae53cd3633a8f3b1ee61c6200974c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 7 Aug 2025 16:32:49 +0900 Subject: [PATCH 07/12] workflows: Add FLB_ARROW task for testing Signed-off-by: Hiroshi Hatake --- .github/workflows/unit-tests.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index a5b79f8cb27..95529db5d64 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -52,6 +52,7 @@ jobs: - "-DFLB_SANITIZE_THREAD=On" - "-DFLB_SIMD=On" - "-DFLB_SIMD=Off" + - "-DFLB_ARROW=On" cmake_version: - "3.31.6" compiler: @@ -66,6 +67,10 @@ jobs: compiler: cc: clang cxx: clang++ + - flb_option: "-DFLB_ARROW=On" + compiler: + cc: clang + cxx: clang++ permissions: contents: read steps: @@ -86,6 +91,15 @@ jobs: with: repository: calyptia/fluent-bit-ci path: ci + - name: Setup Apache Arrow libraries for parquet (-DFLB_ARROW=On Only) + if: matrix.flb_option == '-DFLB_ARROW=On' + run: | + sudo apt-get update + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://packages.apache.org/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-glib-dev libparquet-glib-dev - name: ${{ matrix.compiler.cc }} & ${{ matrix.compiler.cxx }} - ${{ matrix.flb_option }} run: | From a6037ee2b5a284a368c7b26517bddf3b4af6b55a Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 7 Aug 2025 19:43:00 +0900 Subject: [PATCH 08/12] build: aws: compression: Use LDFLAGS instead of LIBRARIES Using LIBRARIES does not point into the right place of the apache arrow-glib and parquet-glib libraries. Signed-off-by: Hiroshi Hatake --- src/aws/compression/arrow/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt index 6448a971884..dd384f8dc8a 100644 --- a/src/aws/compression/arrow/CMakeLists.txt +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -4,8 +4,8 @@ set(src add_library(flb-aws-arrow STATIC ${src}) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_INCLUDE_DIRS}) -target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LIBRARIES}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_LDFLAGS}) if (ARROW_GLIB_PARQUET_FOUND) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_PARQUET_INCLUDE_DIRS}) -target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LIBRARIES}) +target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LDFLAGS}) endif() From 387c22d4e1d41157a97c7f0a04a1949f6e98c6d1 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 25 Aug 2025 17:46:21 +0900 Subject: [PATCH 09/12] out_s3: Restore detecting small fragmented gzip compressions clause Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index a1a8484181b..913c245a7ce 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1170,9 +1170,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, goto multipart; } else { - if (ctx->use_put_object == FLB_FALSE && - (ctx->compression == FLB_AWS_COMPRESS_ARROW || - ctx->compression == FLB_AWS_COMPRESS_PARQUET)) { + if (ctx->use_put_object == FLB_FALSE && ctx->compression == FLB_AWS_COMPRESS_GZIP) { flb_plg_info(ctx->ins, "Pre-compression upload_chunk_size= %zu, After compression, chunk is only %zu bytes, " "the chunk was too small, using PutObject to upload", preCompress_size, body_size); } From 762323ed626b23f6098da3e573e16f69a1137ad5 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 25 Aug 2025 20:22:52 +0900 Subject: [PATCH 10/12] build: Make more generalized message Signed-off-by: Hiroshi Hatake --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 093ff0bd1fb..c4d53f99460 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1237,7 +1237,7 @@ pkg_check_modules(ARROW_GLIB_PARQUET QUIET parquet-glib) if(FLB_ARROW AND ARROW_GLIB_PARQUET_FOUND) FLB_DEFINITION(FLB_HAVE_ARROW_PARQUET) else() - message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression for AWS module") + message(STATUS "Arrow GLib Parquet not found. Disabling parquet compression") endif() # EBPF Support From fbaab0601445eea0a3159783d027c092f9579666 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 25 Aug 2025 20:26:32 +0900 Subject: [PATCH 11/12] arrow: compress: Use flb_malloc instead of raw malloc and set errno Signed-off-by: Hiroshi Hatake --- src/aws/compression/arrow/CMakeLists.txt | 4 ++++ src/aws/compression/arrow/compress.c | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/aws/compression/arrow/CMakeLists.txt b/src/aws/compression/arrow/CMakeLists.txt index dd384f8dc8a..c80a09df3ec 100644 --- a/src/aws/compression/arrow/CMakeLists.txt +++ b/src/aws/compression/arrow/CMakeLists.txt @@ -9,3 +9,7 @@ if (ARROW_GLIB_PARQUET_FOUND) target_include_directories(flb-aws-arrow PRIVATE ${ARROW_GLIB_PARQUET_INCLUDE_DIRS}) target_link_libraries(flb-aws-arrow ${ARROW_GLIB_PARQUET_LDFLAGS}) endif() + +if(FLB_JEMALLOC) + target_link_libraries(flb-aws-arrow ${JEMALLOC_LIBRARIES}) +endif() diff --git a/src/aws/compression/arrow/compress.c b/src/aws/compression/arrow/compress.c index 8b91a715750..000d629b553 100644 --- a/src/aws/compression/arrow/compress.c +++ b/src/aws/compression/arrow/compress.c @@ -12,6 +12,7 @@ #include #endif #include +#include #include /* @@ -258,8 +259,9 @@ int out_s3_compress_parquet(void *json, size_t size, void **out_buf, size_t *out return -1; } - buf = malloc(len); + buf = flb_malloc(len); if (buf == NULL) { + flb_errno(); g_object_unref(buffer); g_bytes_unref(bytes); return -1; From 5a0267fcfa8adf6ad0d1c1b33766257e6d84f54b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 25 Aug 2025 20:34:29 +0900 Subject: [PATCH 12/12] out_s3: Address comments Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 913c245a7ce..d9d25f187b1 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -704,8 +704,8 @@ static int cb_s3_init(struct flb_output_instance *ins, return -1; } if (ctx->use_put_object == FLB_FALSE && - (ctx->compression == FLB_AWS_COMPRESS_ARROW || - ctx->compression == FLB_AWS_COMPRESS_PARQUET)) { + (ret == FLB_AWS_COMPRESS_ARROW || + ret == FLB_AWS_COMPRESS_PARQUET)) { flb_plg_error(ctx->ins, "use_put_object must be enabled when Apache Arrow or Parquet is enabled"); return -1; @@ -1132,8 +1132,13 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk, ret = flb_aws_compression_compress(ctx->compression, body, body_size, &payload_buf, &payload_size); if (ret == -1) { flb_plg_error(ctx->ins, "Failed to compress data"); + if (chunk != NULL) { + s3_store_file_unlock(chunk); + chunk->failures += 1; + } return FLB_RETRY; - } else { + } + else { preCompress_size = body_size; body = (void *) payload_buf; body_size = payload_size;