Skip to content

Conversation

@Alex-PLACET
Copy link
Member

No description provided.

@Alex-PLACET Alex-PLACET self-assigned this Sep 17, 2025
@codecov-commenter
Copy link

codecov-commenter commented Sep 17, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 88.83249% with 22 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@60a6a51). Learn more about missing BASE report.

Files with missing lines Patch % Lines
src/serialize_utils.cpp 89.24% 17 Missing ⚠️
include/sparrow_ipc/utils.hpp 84.21% 3 Missing ⚠️
include/sparrow_ipc/serialize.hpp 81.81% 2 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Additional details and impacted files
@@           Coverage Diff           @@
##             main      #22   +/-   ##
=======================================
  Coverage        ?   77.91%           
=======================================
  Files           ?       22           
  Lines           ?     1028           
  Branches        ?        0           
=======================================
  Hits            ?      801           
  Misses          ?      227           
  Partials        ?        0           
Flag Coverage Δ
unittests 77.91% <88.83%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Alex-PLACET Alex-PLACET requested a review from Copilot September 17, 2025 12:37
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds serialization functionality to the sparrow_ipc library, implementing the ability to serialize Arrow record batches into binary format. This complements the existing deserialization capabilities.

  • Implements comprehensive serialization functionality for Arrow record batches and schemas
  • Adds new test case to verify round-trip serialization/deserialization consistency
  • Introduces utility functions for checking record batch consistency and formatting

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/test_deserialization_with_files.cpp Added new test case for serialization round-trip testing and comparison utility function
src/serialize_utils.cpp Core serialization implementation with utilities for converting record batches to binary format
include/sparrow_ipc/serialize_utils.hpp Header file defining serialization utility functions and their documentation
include/sparrow_ipc/serialize.hpp Main serialization API for collections of record batches
include/sparrow_ipc/utils.hpp Added record batch consistency checking template function
src/encapsulated_message.cpp Minor bug fix for condition checking
include/sparrow_ipc/magic_values.hpp Removed empty line for formatting consistency
CMakeLists.txt Updated build configuration to include new serialization source files

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@Alex-PLACET Alex-PLACET marked this pull request as ready for review September 17, 2025 13:02
{
return {};
}
if (!utils::check_record_batches_consistency(record_batches))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want a streaming interface at some point, so that not all record batches have to be materialized before serializing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's a first step, we will support streaming later in another PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but all the internals in the PR are architected around the idea of serializing all batches at once to a single vector. So you'll have to refactor all of this...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "low level" methods work on single batches, so I guess we could add a method accepting a single batch that would directly call them. Or if we can optimize this implementation such that serializing a vector of a single record_batch is equivalent to serializing a record_batch directly, we can add something like:

std::vector<uint8_t> serialize(const record_batch& rb)
{
    return serialize(std::ranges::single_view(rb));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in the next PR

Comment on lines 53 to 61
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches);
serialized_schema.insert(
serialized_schema.end(),
std::make_move_iterator(serialized_record_batches.begin()),
std::make_move_iterator(serialized_record_batches.end())
);
// End of stream message
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to make several memory copies because of resizes along the way:

  1. copies when appending each serialized record batch to the previous one (unless you're extremely careful to presize the output buffer to the right length)
  2. a copy when appending the serialized batches to the serialized schema

So not only you materialize the entire IPC stream in memory, but there are intermediate copies involved. This is probably not going to perform very nicely on very large data.

Given that IPC can typically be written into a file or socket, this seems suboptimal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely sub optimal, the point is only to have a first milestone where we are able to follow the serialization specification.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO: rewrite for performance, or something similar? We should also open an issue to track this once the PR is merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible optimization could be to have the serialize_xxx implementation methods accept an std::vector<std::uint8_t>, or an insert_iterator, instead of returning a std::vector<std::uint8_t>. This way, the "driving" method could allocate a single vector, reserve additional memory between calls to the lower layer methods when possible (this can be done in a dedicated PR when optimizing the performance).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in a next PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in the next PR

const size_t first_rb_nb_columns = first_rb.nb_columns();
for (const sparrow::record_batch& rb : record_batches)
{
const auto rb_nb_columns = rb.nb_columns();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised you don't have a schema comparison function to automate all this instead of comparing columns one by one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have schema comparison, but our record_batch don't use ArrowSchema

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as here

* with continuation bytes, 4-byte length prefix, schema data, and 8-byte alignment padding
*/
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
serialize_schema_message(const sparrow::record_batch& record_batch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious: you don't have a sparrow::schema abstraction that can be passed here instead of a record batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our record_batch don't use ArrowSchema, we have to transform it to a Struct array first

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that your PR adding extraction to batches has been merged, we could release sparrow and rebase this PR on it? This can also be done in a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we should extract it to a struct array in that case.

* in Arrow IPC format, ready for transmission or storage
*/
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
serialize_record_batch(const sparrow::record_batch& record_batch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work for dictionary arrays? (or it doesn't?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support it currently

const auto value_offset = builder.CreateString(std::string(value));
kv_offsets.push_back(org::apache::arrow::flatbuf::CreateKeyValue(builder, key_offset, value_offset));
}
return builder.CreateVector(kv_offsets);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return builder.CreateVector(kv_offsets);
return builder.CreateVector(std::move(kv_offsets));

flatbuffers::Offset<org::apache::arrow::flatbuf::Field> field = create_field(builder, child);
children_vec.emplace_back(field);
}
return children_vec.empty() ? 0 : builder.CreateVector(children_vec);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here and below: move children_vec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateVector takes const ref

const auto fields_vec = create_children(schema_builder, record_batch.columns());
const auto schema_offset = org::apache::arrow::flatbuf::CreateSchema(
schema_builder,
org::apache::arrow::flatbuf::Endianness::Little, // TODO: make configurable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than configurable, it should probably mirror the machine's endianness?

Comment on lines 127 to 128
utils::align_to_8(static_cast<int64_t>(schema_buffer.size()))
- static_cast<int64_t>(schema_buffer.size()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want some kind of helper function for the padding calculation? You're computing it in fill_body as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +304 to +292
std::vector<uint8_t> body = generate_body(record_batch);
output.insert(output.end(), std::make_move_iterator(body.begin()), std::make_move_iterator(body.end()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making another copy...

}
}

// Helper function to create a simple ArrowSchema for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: not indented correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the indentation here doesn't seem to be changed.

@Hind-M
Copy link
Member

Hind-M commented Sep 18, 2025

Looking at the linux job (from source) failures, it seems we're missing #include <cstdint> in magic_values.hpp and std:: in std::uint8_t.

}
}

// Helper function to create a simple ArrowSchema for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah the indentation here doesn't seem to be changed.

Copy link
Member

@JohanMabille JohanMabille left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in one of the comments, a possible way to improve performance and limit the allocations is to have the different methods accept a std::vector or an iterator instead of returning a std::vector. This would make it easy to have different high level APIs (one returning a single vector, one returning a vector of vectors if that seems more appropriate, one accepting a stream that would call the underlying method with a stream_iterator, etc etc). This refactoring can be done in a dedicated PR when we have a reference implementation to test again.

* https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
*/
constexpr std::array<uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
inline constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};

* https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
*/
constexpr std::array<uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
inline constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};

* with continuation bytes, 4-byte length prefix, schema data, and 8-byte alignment padding
*/
[[nodiscard]] SPARROW_IPC_API std::vector<uint8_t>
serialize_schema_message(const sparrow::record_batch& record_batch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that your PR adding extraction to batches has been merged, we could release sparrow and rebase this PR on it? This can also be done in a future PR.

Comment on lines 53 to 61
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches);
serialized_schema.insert(
serialized_schema.end(),
std::make_move_iterator(serialized_record_batches.begin()),
std::make_move_iterator(serialized_record_batches.end())
);
// End of stream message
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO: rewrite for performance, or something similar? We should also open an issue to track this once the PR is merged.

* @note The function reserves memory for the vector based on the metadata size for
* optimal performance.
*/
[[nodiscard]] SPARROW_IPC_API
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this dependency should not leak. Given that this function and create_field are used in serialize_utils only, they should not be declared here. Besides, their visibility should be private/hidden.

If there is a need to test them, then their visibility can be conditionally changed to public (only when building the tests for instance), and they should be declared as extern in the test library.

const size_t first_rb_nb_columns = first_rb.nb_columns();
for (const sparrow::record_batch& rb : record_batches)
{
const auto rb_nb_columns = rb.nb_columns();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as here

* @note The function uses move iterators to efficiently transfer the serialized data
* from individual record batches to the output vector.
*/
[[nodiscard]] std::vector<uint8_t> serialize_record_batches(const R& record_batches)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the name a bit confusing; at first sight, this method looks very similar to the public API serialize(cont R& record_batchs). Even if the documentation explains the differences in detail, the names sould reflect these difference so that a reader can quickly see these methods are different.

Comment on lines 53 to 61
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
std::vector<uint8_t> serialized_record_batches = serialize_record_batches(record_batches);
serialized_schema.insert(
serialized_schema.end(),
std::make_move_iterator(serialized_record_batches.begin()),
std::make_move_iterator(serialized_record_batches.end())
);
// End of stream message
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible optimization could be to have the serialize_xxx implementation methods accept an std::vector<std::uint8_t>, or an insert_iterator, instead of returning a std::vector<std::uint8_t>. This way, the "driving" method could allocate a single vector, reserve additional memory between calls to the lower layer methods when possible (this can be done in a dedicated PR when optimizing the performance).

{
return {};
}
if (!utils::check_record_batches_consistency(record_batches))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "low level" methods work on single batches, so I guess we could add a method accepting a single batch that would directly call them. Or if we can optimize this implementation such that serializing a vector of a single record_batch is equivalent to serializing a record_batch directly, we can add something like:

std::vector<uint8_t> serialize(const record_batch& rb)
{
    return serialize(std::ranges::single_view(rb));
}

@Alex-PLACET
Copy link
Member Author

As mentioned in one of the comments, a possible way to improve performance and limit the allocations is to have the different methods accept a std::vector or an iterator instead of returning a std::vector. This would make it easy to have different high level APIs (one returning a single vector, one returning a vector of vectors if that seems more appropriate, one accepting a stream that would call the underlying method with a stream_iterator, etc etc). This refactoring can be done in a dedicated PR when we have a reference implementation to test again.

This will be done in the next PR

@Alex-PLACET
Copy link
Member Author

Alex-PLACET commented Sep 23, 2025

Can you add a TODO: rewrite for performance, or something similar? We should also open an issue to track this once the PR is merged.

Done in the next PR

Copy link
Member

@JohanMabille JohanMabille left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's open some issues to track what will be fixed in upcoming PRs when this is merged.

Add documentation

wip

add tests

cleaning

Address review

Fix

fix compialtion try

fix

formatting

review
@JohanMabille
Copy link
Member

Alright, issues have been opened to track unaddressed comments here, let's merge.

@JohanMabille JohanMabille merged commit 6ff5bc7 into QuantStack:main Sep 25, 2025
22 of 27 checks passed
@Alex-PLACET Alex-PLACET deleted the serialize_record_batch branch September 25, 2025 08:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants