Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,18 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/metadata.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/utils.hpp
)

set(SPARROW_IPC_SRC
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp
Expand Down
4 changes: 2 additions & 2 deletions cmake/external_dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ function(find_package_or_fetch)
FetchContent_MakeAvailable(${arg_PACKAGE_NAME})
message(STATUS "\t✅ Fetched ${arg_PACKAGE_NAME}")
else()
message(STATUS "📦 ${actual_pkg_name} found here: ${actual_pkg_name}_DIR")
message(STATUS "📦 ${actual_pkg_name} found here: ${${actual_pkg_name}_DIR}")
endif()
endif()
endfunction()
Expand All @@ -52,7 +52,7 @@ endif()
find_package_or_fetch(
PACKAGE_NAME sparrow
GIT_REPOSITORY https://github.com/man-group/sparrow.git
TAG 1.1.1
TAG 1.1.2
)
unset(CREATE_JSON_READER_TARGET)

Expand Down
8 changes: 4 additions & 4 deletions include/sparrow_ipc/magic_values.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

#include <algorithm>
#include <array>
#include <cstdint>
#include <istream>

namespace sparrow_ipc
{

/**
* Continuation value defined in the Arrow IPC specification:
* https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
*/
constexpr std::array<uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
inline constexpr std::array<std::uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};

/**
* End-of-stream marker defined in the Arrow IPC specification:
* https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
*/
constexpr std::array<uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
inline constexpr std::array<std::uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};

template <std::ranges::input_range R>
[[nodiscard]] bool is_continuation(const R& buf)
Expand All @@ -30,4 +30,4 @@ namespace sparrow_ipc
{
return std::ranges::equal(buf, end_of_stream);
}
}
}
64 changes: 64 additions & 0 deletions include/sparrow_ipc/serialize.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once

#include <ostream>
#include <ranges>
#include <vector>

#include <sparrow/record_batch.hpp>

#include "Message_generated.h"
#include "sparrow_ipc/config/config.hpp"
#include "sparrow_ipc/magic_values.hpp"
#include "sparrow_ipc/serialize_utils.hpp"
#include "sparrow_ipc/utils.hpp"

namespace sparrow_ipc
{
/**
* @brief Serializes a collection of record batches into a binary format.
*
* This function takes a collection of record batches and serializes them into a single
* binary representation following the Arrow IPC format. The serialization includes:
* - Schema message (derived from the first record batch)
* - All record batch data
* - End-of-stream marker
*
* @tparam R Container type that holds record batches (must support empty(), operator[], begin(), end())
* @param record_batches Collection of record batches to serialize. All batches must have identical
* schemas.
*
* @return std::vector<uint8_t> Binary serialized data containing schema, record batches, and
* end-of-stream marker. Returns empty vector if input collection is empty.
*
* @throws std::invalid_argument If record batches have inconsistent schemas or if the collection
* contains batches that cannot be serialized together.
*
* @pre All record batches in the collection must have the same schema
* @pre The container R must not be empty when consistency checking is required
*/
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
std::vector<uint8_t> serialize(const R& record_batches)
{
if (record_batches.empty())
{
return {};
}
if (!utils::check_record_batches_consistency(record_batches))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want a streaming interface at some point, so that not all record batches have to be materialized before serializing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's a first step, we will support streaming later in another PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but all the internals in the PR are architected around the idea of serializing all batches at once to a single vector. So you'll have to refactor all of this...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "low level" methods work on single batches, so I guess we could add a method accepting a single batch that would directly call them. Or if we can optimize this implementation such that serializing a vector of a single record_batch is equivalent to serializing a record_batch directly, we can add something like:

std::vector<uint8_t> serialize(const record_batch& rb)
{
    return serialize(std::ranges::single_view(rb));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in the next PR

{
throw std::invalid_argument(
"All record batches must have the same schema to be serialized together."
);
}
std::vector<uint8_t> serialized_schema = serialize_schema_message(record_batches[0]);
std::vector<uint8_t> serialized_record_batches = serialize_record_batches_without_schema_message(record_batches);
serialized_schema.insert(
serialized_schema.end(),
std::make_move_iterator(serialized_record_batches.begin()),
std::make_move_iterator(serialized_record_batches.end())
);
// End of stream message
serialized_schema.insert(serialized_schema.end(), end_of_stream.begin(), end_of_stream.end());
return serialized_schema;
}
}
Loading
Loading