Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ set(SPARROW_IPC_SRC
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/chunk_memory_serializer.cpp
${SPARROW_IPC_SOURCE_DIR}/compression.cpp
${SPARROW_IPC_SOURCE_DIR}/compression_impl.hpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
Expand Down
8 changes: 3 additions & 5 deletions include/sparrow_ipc/chunk_memory_serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

#include <sparrow/record_batch.hpp>

#include "Message_generated.h"

#include "sparrow_ipc/any_output_stream.hpp"
#include "sparrow_ipc/chunk_memory_output_stream.hpp"
#include "sparrow_ipc/compression.hpp"
#include "sparrow_ipc/config/config.hpp"
#include "sparrow_ipc/memory_output_stream.hpp"
#include "sparrow_ipc/serialize.hpp"
Expand Down Expand Up @@ -44,8 +43,7 @@ namespace sparrow_ipc
* @param stream Reference to a chunked memory output stream that will receive the serialized chunks
* @param compression Optional: The compression type to use for record batch bodies.
*/
// TODO Use enums and such to avoid including flatbuffers headers
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression = std::nullopt);
chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream, std::optional<CompressionType> compression = std::nullopt);

/**
* @brief Writes a single record batch to the chunked stream.
Expand Down Expand Up @@ -131,7 +129,7 @@ namespace sparrow_ipc
std::vector<sparrow::data_type> m_dtypes;
chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>* m_pstream;
bool m_ended{false};
std::optional<org::apache::arrow::flatbuf::CompressionType> m_compression;
std::optional<CompressionType> m_compression;
};

// Implementation
Expand Down
25 changes: 11 additions & 14 deletions include/sparrow_ipc/compression.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
#include <variant>
#include <vector>

#include "Message_generated.h"

#include "sparrow_ipc/config/config.hpp"

namespace sparrow_ipc
{
// TODO use these later if needed for wrapping purposes (flatbuffers/lz4)
// enum class CompressionType
// {
// NONE,
// LZ4,
// ZSTD
// };

// CompressionType to_compression_type(org::apache::arrow::flatbuf::CompressionType compression_type);
enum class CompressionType
{
LZ4_FRAME,
ZSTD
};

constexpr auto CompressionHeaderSize = sizeof(std::int64_t);
[[nodiscard]] SPARROW_IPC_API std::vector<std::uint8_t> compress(
const CompressionType compression_type,
std::span<const std::uint8_t> data);

[[nodiscard]] SPARROW_IPC_API std::vector<std::uint8_t> compress(const org::apache::arrow::flatbuf::CompressionType compression_type, std::span<const std::uint8_t> data);
[[nodiscard]] SPARROW_IPC_API std::variant<std::vector<std::uint8_t>, std::span<const std::uint8_t>> decompress(const org::apache::arrow::flatbuf::CompressionType compression_type, std::span<const std::uint8_t> data);
[[nodiscard]] SPARROW_IPC_API std::variant<std::vector<std::uint8_t>, std::span<const std::uint8_t>> decompress(
const CompressionType compression_type,
std::span<const std::uint8_t> data);
}
11 changes: 1 addition & 10 deletions include/sparrow_ipc/deserialize_primitive_array.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,7 @@ namespace sparrow_ipc

if (compression)
{
// TODO Handle buffers emptiness thoroughly / which is and which is not allowed...
// Validity buffers can be empty
if (validity_buffer_span.empty())
{
buffers.push_back(validity_buffer_span);
}
else
{
buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
}
buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
buffers.push_back(utils::get_decompressed_buffer(data_buffer_span, compression));
}
else
Expand Down
44 changes: 10 additions & 34 deletions include/sparrow_ipc/deserialize_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,22 @@ namespace sparrow_ipc::utils
);

/**
* @brief Extracts bitmap pointer and null count from a RecordBatch buffer.
*
* This function retrieves a bitmap buffer from the specified index in the RecordBatch's
* buffer list and calculates the number of null values represented by the bitmap.
* @brief Extracts a buffer from a RecordBatch's body.
*
* @param record_batch The Arrow RecordBatch containing buffer metadata
* @param body The raw buffer data as a byte span
* @param index The index of the bitmap buffer in the RecordBatch's buffer list
* This function retrieves a buffer span from the specified index in the RecordBatch's
* buffer list and increments the index.
*
* @return A pair containing:
* - First: Pointer to the bitmap data (nullptr if buffer is empty)
* - Second: Count of null values in the bitmap (0 if buffer is empty)
* @param record_batch The Arrow RecordBatch containing buffer metadata.
* @param body The raw buffer data as a byte span.
* @param buffer_index The index of the buffer to retrieve. This value is incremented by the function.
*
* @note If the bitmap buffer has zero length, returns {nullptr, 0}
* @note The returned pointer is a non-const cast of the original const data
* @return A `std::span<const uint8_t>` viewing the extracted buffer data.
* @throws std::runtime_error if the buffer metadata indicates a buffer that exceeds the body size.
*/
// TODO to be removed when not used anymore (after adding compression to deserialize_fixedsizebinary_array)
[[nodiscard]] std::pair<std::uint8_t*, int64_t> get_bitmap_pointer_and_null_count(
[[nodiscard]] std::span<const uint8_t> get_buffer(
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
std::span<const uint8_t> body,
size_t index
size_t& buffer_index
);

/**
Expand All @@ -72,23 +67,4 @@ namespace sparrow_ipc::utils
std::span<const uint8_t> buffer_span,
const org::apache::arrow::flatbuf::BodyCompression* compression
);

/**
* @brief Extracts a buffer from a RecordBatch's body.
*
* This function retrieves a buffer span from the specified index in the RecordBatch's
* buffer list and increments the index.
*
* @param record_batch The Arrow RecordBatch containing buffer metadata.
* @param body The raw buffer data as a byte span.
* @param buffer_index The index of the buffer to retrieve. This value is incremented by the function.
*
* @return A `std::span<const uint8_t>` viewing the extracted buffer data.
* @throws std::runtime_error if the buffer metadata indicates a buffer that exceeds the body size.
*/
[[nodiscard]] std::span<const uint8_t> get_buffer(
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
std::span<const uint8_t> body,
size_t& buffer_index
);
}
10 changes: 1 addition & 9 deletions include/sparrow_ipc/deserialize_variable_size_binary_array.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,7 @@ namespace sparrow_ipc

if (compression)
{
// Validity buffers can be empty
if (validity_buffer_span.empty())
{
buffers.push_back(validity_buffer_span);
}
else
{
buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
}
buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
buffers.push_back(utils::get_decompressed_buffer(offset_buffer_span, compression));
buffers.push_back(utils::get_decompressed_buffer(data_buffer_span, compression));
}
Expand Down
102 changes: 101 additions & 1 deletion include/sparrow_ipc/flatbuffer_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <sparrow/c_interface.hpp>
#include <sparrow/record_batch.hpp>

#include "sparrow_ipc/compression.hpp"
#include "sparrow_ipc/utils.hpp"

namespace sparrow_ipc
{
// Creates a Flatbuffers Decimal type from a format string
Expand Down Expand Up @@ -164,6 +167,42 @@ namespace sparrow_ipc
[[nodiscard]] std::vector<org::apache::arrow::flatbuf::FieldNode>
create_fieldnodes(const sparrow::record_batch& record_batch);

namespace details
{
template <typename Func>
void fill_buffers_impl(
const sparrow::arrow_proxy& arrow_proxy,
std::vector<org::apache::arrow::flatbuf::Buffer>& flatbuf_buffers,
int64_t& offset,
Func&& get_buffer_size
)
{
const auto& buffers = arrow_proxy.buffers();
for (const auto& buffer : buffers)
{
int64_t size = get_buffer_size(buffer);
flatbuf_buffers.emplace_back(offset, size);
offset += utils::align_to_8(size);
}
for (const auto& child : arrow_proxy.children())
{
fill_buffers_impl(child, flatbuf_buffers, offset, get_buffer_size);
}
}

template <typename Func>
std::vector<org::apache::arrow::flatbuf::Buffer> get_buffers_impl(const sparrow::record_batch& record_batch, Func&& fill_buffers_func)
{
std::vector<org::apache::arrow::flatbuf::Buffer> buffers;
int64_t offset = 0;
for (const auto& column : record_batch.columns())
{
const auto& arrow_proxy = sparrow::detail::array_access::get_arrow_proxy(column);
fill_buffers_func(arrow_proxy, buffers, offset);
}
return buffers;
}
} // namespace details

/**
* @brief Recursively fills a vector of FlatBuffer Buffer objects with buffer information from an Arrow
Expand Down Expand Up @@ -205,6 +244,67 @@ namespace sparrow_ipc
[[nodiscard]] std::vector<org::apache::arrow::flatbuf::Buffer>
get_buffers(const sparrow::record_batch& record_batch);

/**
* @brief Recursively populates a vector with compressed buffer metadata from an Arrow proxy.
*
* This function traverses the Arrow proxy and its children, compressing each buffer and recording
* its metadata (offset and size) in the provided vector. The offset is updated to ensure proper
* alignment for each subsequent buffer.
*
* @param arrow_proxy The Arrow proxy containing the buffers to be compressed.
* @param flatbuf_compressed_buffers A vector to store the resulting compressed buffer metadata.
* @param offset The current offset in the buffer layout, which will be updated by the function.
* @param compression_type The compression algorithm to use.
*/
void fill_compressed_buffers(
const sparrow::arrow_proxy& arrow_proxy,
std::vector<org::apache::arrow::flatbuf::Buffer>& flatbuf_compressed_buffers,
int64_t& offset,
const CompressionType compression_type
);

/**
* @brief Retrieves metadata describing the layout of compressed buffers within a record batch.
*
* This function processes a record batch to determine the metadata (offset and size)
* for each of its buffers, assuming they are compressed using the specified algorithm.
* This metadata accounts for each compressed buffer being prefixed by its 8-byte
* uncompressed size and padded to ensure 8-byte alignment.
*
* @param record_batch The record batch whose buffers' compressed metadata is to be retrieved.
* @param compression_type The compression algorithm that would be applied (e.g., LZ4_FRAME, ZSTD).
* @return A vector of FlatBuffer Buffer objects, each describing the offset and
* size of a corresponding compressed buffer within a larger message body.
*/
[[nodiscard]] std::vector<org::apache::arrow::flatbuf::Buffer>
get_compressed_buffers(const sparrow::record_batch& record_batch, const CompressionType compression_type);

/**
* @brief Calculates the total size of the body section for an Arrow array.
*
* This function recursively computes the total size needed for all buffers
* in an Arrow array structure, including buffers from child arrays. Each
* buffer size is aligned to 8-byte boundaries as required by the Arrow format.
*
* @param arrow_proxy The Arrow array proxy containing buffers and child arrays
* @param compression The compression type to use when serializing
* @return int64_t The total aligned size in bytes of all buffers in the array hierarchy
*/
[[nodiscard]] int64_t calculate_body_size(const sparrow::arrow_proxy& arrow_proxy, std::optional<CompressionType> compression = std::nullopt);

/**
* @brief Calculates the total body size of a record batch by summing the body sizes of all its columns.
*
* This function iterates through all columns in the given record batch and accumulates
* the body size of each column's underlying Arrow array proxy. The body size represents
* the total memory required for the serialized data content of the record batch.
*
* @param record_batch The sparrow record batch containing columns to calculate size for
* @param compression The compression type to use when serializing
* @return int64_t The total body size in bytes of all columns in the record batch
*/
[[nodiscard]] int64_t calculate_body_size(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);

/**
* @brief Creates a FlatBuffer message containing a serialized Apache Arrow RecordBatch.
*
Expand All @@ -222,5 +322,5 @@ namespace sparrow_ipc
* @note Variadic buffer counts is not currently implemented (set to 0)
*/
[[nodiscard]] flatbuffers::FlatBufferBuilder
get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<org::apache::arrow::flatbuf::CompressionType> compression = std::nullopt);
get_record_batch_message_builder(const sparrow::record_batch& record_batch, std::optional<CompressionType> compression = std::nullopt);
}
5 changes: 3 additions & 2 deletions include/sparrow_ipc/serialize.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "Message_generated.h"
#include "sparrow_ipc/any_output_stream.hpp"
#include "sparrow_ipc/compression.hpp"
#include "sparrow_ipc/config/config.hpp"
#include "sparrow_ipc/magic_values.hpp"
#include "sparrow_ipc/serialize_utils.hpp"
Expand Down Expand Up @@ -36,7 +37,7 @@ namespace sparrow_ipc
*/
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
void serialize_record_batches_to_ipc_stream(const R& record_batches, any_output_stream& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression)
void serialize_record_batches_to_ipc_stream(const R& record_batches, any_output_stream& stream, std::optional<CompressionType> compression)
{
if (record_batches.empty())
{
Expand Down Expand Up @@ -76,7 +77,7 @@ namespace sparrow_ipc
*/

SPARROW_IPC_API void
serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<org::apache::arrow::flatbuf::CompressionType> compression);
serialize_record_batch(const sparrow::record_batch& record_batch, any_output_stream& stream, std::optional<CompressionType> compression);

/**
* @brief Serializes a schema message for a record batch into a byte buffer.
Expand Down
Loading
Loading