Skip to content

Conversation

Hind-M
Copy link
Member

@Hind-M Hind-M commented Oct 7, 2025

Only handling lz4 codec for now (zstd will be handled in a next PR).

@codecov-commenter
Copy link

codecov-commenter commented Oct 8, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 86.56126% with 34 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@dae8209). Learn more about missing BASE report.

Files with missing lines Patch % Lines
src/compression.cpp 85.50% 10 Missing ⚠️
src/arrow_interface/arrow_array.cpp 63.15% 7 Missing ⚠️
src/deserialize_fixedsizebinary_array.cpp 0.00% 6 Missing ⚠️
...row_ipc/deserialize_variable_size_binary_array.hpp 70.58% 5 Missing ⚠️
src/deserialize_utils.cpp 84.21% 3 Missing ⚠️
src/serialize_utils.cpp 95.00% 2 Missing ⚠️
include/sparrow_ipc/chunk_memory_serializer.hpp 66.66% 1 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@           Coverage Diff           @@
##             main      #30   +/-   ##
=======================================
  Coverage        ?   76.96%           
=======================================
  Files           ?       32           
  Lines           ?     1502           
  Branches        ?        0           
=======================================
  Hits            ?     1156           
  Misses          ?      346           
  Partials        ?        0           
Flag Coverage Δ
unittests 76.96% <86.56%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Hind-M Hind-M changed the title Handle compression in serialization Handle lz4 compression Oct 14, 2025
@Hind-M Hind-M force-pushed the compr_ser branch 2 times, most recently from 0d8c498 to c5eb667 Compare October 16, 2025 10:00
@Hind-M
Copy link
Member Author

Hind-M commented Oct 16, 2025

There are still a few things missing and some room for improvement, but I suggest merging this ASAP to avoid further conflicts (I just resolved the ones after merging #29 and had to rework the compression in the serialization part. For now, this PR is just to get something working).
I'll address the TODO items in a follow-up PR. Any comments or reviews regarding design choices can be noted, but they will be addressed in the next PRs.

@Hind-M Hind-M marked this pull request as ready for review October 16, 2025 12:04
@Alex-PLACET
Copy link
Member

CAn you add tests which only test the compression/decompression of a buffer ?

* @param compression Optional: The compression type to use for record batch bodies.
*/
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream);
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression = std::nullopt);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a sparrow-ipc enum to keep public signatures free from flatbuffers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be done in a follow-up PR (added a TODO for now).

if (compression.has_value())
{
// If compressed, the body size is the sum of compressed buffer sizes + original size prefixes + padding
auto [compressed_body, compressed_buffers] = generate_compressed_body_and_buffers(record_batch, compression.value());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to compress the data to do the calculation of the size of the message.
I saw that LZ4F_compressFrameBound can give the maximum size of the compressed buffer. It think this should be used instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you want the exact size, not the maximum size (which is going to be some trivial calculation such as uncompressed size + K).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used for the memory reservation, not for the message header.
Can you know the compressed size without compressing the data first ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you can't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have an issue with the fill_buffers function in flatbuffer_utils.cpp
In this function we create the flatbuffer::Buffer which are the offset and size of each buffer in the body.
As the sizes of the buffers are unknow before the data is compressed, you can't create the record_batch message.
It means that we have to compress the buffers before to create the message and keep the compressed buffers in memory.
Once all the buffers are compressed, we can finally create and send the record_batch message

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW a test where we try to deserialize ou serialized with compression is missing. It should not work because of what I said in the previous message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we should split the code execution in two different branches for compressed vs uncompressed buffers when we create record batch messages.
Trying to keep the same code path seems to lead to code complexity without so much benefit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we should split the code execution in two different branches for compressed vs uncompressed buffers when we create record batch messages.

No, this is really a bad idea because you don't want to write buffers as compressed when they are not compressible (see my other comments about this).

Copy link
Member Author

@Hind-M Hind-M Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have an issue with the fill_buffers function in flatbuffer_utils.cpp

fill_buffers in only used in get_buffers which is called when there is no compression, otherwise, we use generate_compressed_buffers, see get_record_batch_message_builder with the new changes.

BTW a test where we try to deserialize ou serialized with compression is missing. It should not work because of what I said in the previous message.

There are tests doing that (see "TEST_CASE("Compare record_batch serialization with stream file using LZ4 compression"), are you thinking about something else?
On another hand, I think we should eventually add tests to write streams and compare them...


#include <sparrow/record_batch.hpp>

#include "Message_generated.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, in Arrow C++ we ensure that flatbuffers headers (and any other dependency) are not exposed through public Arrow headers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this will be done in a follow-up PR.

memory_output_stream stream(buffer);
any_output_stream astream(stream);
serialize_record_batch(rb, astream);
serialize_record_batch(rb, astream, m_compression);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: this concatenates all output buffers as a single chunk even though we have chunked_memory_output_stream which would avoid such copies. It is a bit of a waste.

Comment on lines 51 to 136
if (data.empty())
{
return {};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this should never happen according to the Flatbuffers spec. Did you encounter this situation somewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I realized we are compressing/decompressing all buffers (validity buffers and data buffer), and we need this empty buffer case when there is a validity bitmap with no nulls.
The spec doesn't really say anything about compressing these...
I'm wondering if we should not do it, or leave it as is since discriminating buffers could make things more complex...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if the buffer was compressed, then the compressed buffer cannot be empty as all compressors (LZ4, ZSTD) add a header of their own, even if the original buffer was empty:

>>> import lz4.frame
>>> lz4.frame.compress(b"")
b'\x04"M\x18`@\x82\x00\x00\x00\x00'
>>> import zstandard
>>> zstandard.compress(b"")
b'(\xb5/\xfd \x00\x01\x00\x00'

(but of course, this also means that the writer should not have compressed an empty buffer, because doing so increases the data size instead of decreasing it :))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So getting an empty data span here is an error (you may still want to detect it, because its data pointer will be null and the underlying decompressor may not like that).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if you mean that a function called decompress, designed to do only that and shouldn't get empty data intrinsically, this check could be moved up. Is that what you meant?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, an empty data buffer here would mean an invalid/corrupt IPC stream. So it's a matter of if you want to be resilient against that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it more helpful to check the validity of the buffer when we get it from the record batch (because the error mesage can explicitly state which buffer is empty). This can be done in a dedicated PR where we rework the error handling policy, but that would deserve a TODO.

@Hind-M Hind-M requested a review from Alex-PLACET October 22, 2025 12:44
{
public:
using optionally_owned_buffer = std::variant<std::vector<uint8_t>, std::span<const uint8_t>>;
explicit arrow_array_private_data(std::vector<optionally_owned_buffer>&& buffers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we mix owned and non-owned buffers? Otherwise a variant of vectors might be more suitable here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I previously commented elsewhere, since some buffers can be compressed and others not, you need this flexibility otherwise you'll end up copying non-compressed buffer into an owned version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and I think at some sparrow may want a more versatile buffer facility such that ownership needn't be under the form of a std::vector?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I previously commented elsewhere, since some buffers can be compressed and others not, you need this flexibility otherwise you'll end up copying non-compressed buffer into an owned version.

Ah indeed, I totally forgot it when writing my comment.

}
delete array->dictionary;
array->dictionary = nullptr;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to find a way to factorize this implementation and https://github.com/man-group/sparrow/blob/c7feca8ca0dd87c2ead88cfce425ebd121b18c54/include/sparrow/arrow_interface/arrow_array_schema_common_release.hpp#L31 to avoid code duplication. Tihs requires changes in sparrow and should not block this PR, but I think a TODO would be nice here.

Comment on lines 51 to 136
if (data.empty())
{
return {};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it more helpful to check the validity of the buffer when we get it from the record batch (because the error mesage can explicitly state which buffer is empty). This can be done in a dedicated PR where we rework the error handling policy, but that would deserve a TODO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants