Skip to content

Commit 6359136

Browse files
committed
wip
1 parent 9294c41 commit 6359136

15 files changed

+147
-59
lines changed

include/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ namespace sparrow_ipc
2828
}
2929
SPARROW_ASSERT_TRUE(t.private_data != nullptr);
3030
const auto private_data = static_cast<const private_data_type*>(t.private_data);
31+
delete private_data;
32+
t.private_data = nullptr;
3133

3234
if (t.dictionary)
3335
{

include/sparrow_ipc/config/sparrow_ipc_version.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace sparrow_ipc
66
constexpr int SPARROW_IPC_VERSION_MINOR = 1;
77
constexpr int SPARROW_IPC_VERSION_PATCH = 0;
88

9-
constexpr int SPARROW_IPC_BINARY_CURRENT = 9;
9+
constexpr int SPARROW_IPC_BINARY_CURRENT = 1;
1010
constexpr int SPARROW_IPC_BINARY_REVISION = 0;
1111
constexpr int SPARROW_IPC_BINARY_AGE = 0;
1212
}

include/sparrow_ipc/deserialize.hpp

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,31 @@
55

66
#include <sparrow/record_batch.hpp>
77

8-
#include "config/config.hpp"
98
#include "Message_generated.h"
9+
#include "sparrow_ipc/config/config.hpp"
1010
#include "sparrow_ipc/encapsulated_message.hpp"
11-
#include "SparseTensor_generated.h"
1211

1312
namespace sparrow_ipc
1413
{
14+
/**
15+
* @brief Deserializes a schema message from Arrow IPC format data.
16+
*
17+
* This function parses an Arrow IPC schema message from a byte buffer, extracting
18+
* the field name and custom metadata from the first (and expected only) field in the schema.
19+
*
20+
* @param data A span containing the raw byte data to deserialize from
21+
* @param current_offset Reference to the current position in the data buffer, which will be
22+
* updated to point past the processed schema message
23+
* @param name Optional output parameter that will contain the field name if present
24+
* @param metadata Optional output parameter that will contain the custom metadata
25+
* key-value pairs if present
26+
*
27+
* @throws std::runtime_error If the message is not a Schema message type
28+
* @throws std::runtime_error If the schema does not contain exactly one field
29+
*
30+
* @note This function expects the data to start with a 4-byte length prefix followed
31+
* by the FlatBuffer schema message data
32+
*/
1533
SPARROW_IPC_API void deserialize_schema_message(
1634
std::span<const uint8_t> data,
1735
size_t& current_offset,
@@ -21,6 +39,25 @@ namespace sparrow_ipc
2139
[[nodiscard]] SPARROW_IPC_API const org::apache::arrow::flatbuf::RecordBatch*
2240
deserialize_record_batch_message(std::span<const uint8_t> data, size_t& current_offset);
2341

42+
/**
43+
* @brief Deserializes an Arrow IPC stream from binary data into a vector of record batches.
44+
*
45+
* This function processes an Arrow IPC stream format, extracting schema information
46+
* and record batch data. It handles encapsulated messages sequentially, first expecting
47+
* a Schema message followed by one or more RecordBatch messages.
48+
*
49+
* @param data A span of bytes containing the serialized Arrow IPC stream data
50+
*
51+
* @return std::vector<sparrow::record_batch> A vector containing all deserialized record batches
52+
*
53+
* @throws std::runtime_error If:
54+
* - A RecordBatch message is encountered before a Schema message
55+
* - A RecordBatch message header is missing or invalid
56+
* - Unsupported message types are encountered (Tensor, DictionaryBatch, SparseTensor)
57+
* - An unknown message header type is encountered
58+
*
59+
* @note The function processes messages until an end-of-stream marker is detected
60+
*/
2461
[[nodiscard]] SPARROW_IPC_API std::vector<sparrow::record_batch>
2562
deserialize_stream(std::span<const uint8_t> data);
2663
}

include/sparrow_ipc/deserialize_primitive_array.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ namespace sparrow_ipc
4040
buffer_index++
4141
);
4242
const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
43+
if (body.size() < (primitive_buffer_metadata->offset() + primitive_buffer_metadata->length()))
44+
{
45+
throw std::runtime_error("Primitive buffer exceeds body size");
46+
}
4347
auto primitives_ptr = const_cast<uint8_t*>(body.data() + primitive_buffer_metadata->offset());
4448
std::vector<std::uint8_t*> buffers = {bitmap_ptr, primitives_ptr};
4549
ArrowArray array = make_non_owning_arrow_array(

include/sparrow_ipc/deserialize_utils.hpp

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,33 @@
11
#pragma once
22

33
#include <span>
4+
#include <utility>
45

56
#include <sparrow/buffer/dynamic_bitset/dynamic_bitset_view.hpp>
67
#include <sparrow/u8_buffer.hpp>
7-
#include <utility>
88

99
#include "Message_generated.h"
1010
#include "Schema_generated.h"
1111

1212
namespace sparrow_ipc::utils
1313
{
14-
template <typename T>
15-
[[nodiscard]] sparrow::u8_buffer<T> message_buffer_to_u8buffer(
16-
const org::apache::arrow::flatbuf::RecordBatch* record_batch,
17-
std::span<const uint8_t> body,
18-
size_t index
19-
)
20-
{
21-
const auto buffer_metadata = record_batch->buffers()->Get(index);
22-
auto ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset());
23-
auto casted_ptr = reinterpret_cast<T*>(ptr);
24-
const std::size_t count = static_cast<std::size_t>(buffer_metadata->length() / sizeof(T));
25-
return sparrow::u8_buffer<T>{casted_ptr, count};
26-
}
27-
28-
[[nodiscard]] const sparrow::dynamic_bitset_view<const std::uint8_t> message_buffer_to_validity_bitmap(
29-
const org::apache::arrow::flatbuf::RecordBatch* record_batch,
30-
std::span<const uint8_t> body,
31-
size_t index
32-
);
33-
14+
/**
15+
* @brief Extracts bitmap pointer and null count from a RecordBatch buffer.
16+
*
17+
* This function retrieves a bitmap buffer from the specified index in the RecordBatch's
18+
* buffer list and calculates the number of null values represented by the bitmap.
19+
*
20+
* @param record_batch The Arrow RecordBatch containing buffer metadata
21+
* @param body The raw buffer data as a byte span
22+
* @param index The index of the bitmap buffer in the RecordBatch's buffer list
23+
*
24+
* @return A pair containing:
25+
* - First: Pointer to the bitmap data (nullptr if buffer is empty)
26+
* - Second: Count of null values in the bitmap (0 if buffer is empty)
27+
*
28+
* @note If the bitmap buffer has zero length, returns {nullptr, 0}
29+
* @note The returned pointer is a non-const cast of the original const data
30+
*/
3431
[[nodiscard]] std::pair<std::uint8_t*, int64_t> get_bitmap_pointer_and_null_count(
3532
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
3633
std::span<const uint8_t> body,

include/sparrow_ipc/deserialize_variable_size_binary_array.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,18 @@ namespace sparrow_ipc
3636
body,
3737
buffer_index++
3838
);
39+
3940
const auto offset_metadata = record_batch.buffers()->Get(buffer_index++);
41+
if ((offset_metadata->offset() + offset_metadata->length()) > body.size())
42+
{
43+
throw std::runtime_error("Offset buffer exceeds body size");
44+
}
4045
auto offset_ptr = const_cast<uint8_t*>(body.data() + offset_metadata->offset());
4146
const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++);
47+
if ((buffer_metadata->offset() + buffer_metadata->length()) > body.size())
48+
{
49+
throw std::runtime_error("Data buffer exceeds body size");
50+
}
4251
auto buffer_ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset());
4352
std::vector<std::uint8_t*> buffers = {bitmap_ptr, offset_ptr, buffer_ptr};
4453
ArrowArray array = make_non_owning_arrow_array(

include/sparrow_ipc/magic_values.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,25 @@
66

77
namespace sparrow_ipc
88
{
9+
10+
/**
11+
* Continuation value defined in the Arrow IPC specification:
12+
* https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
13+
*/
914
constexpr std::array<uint8_t, 4> continuation = {0xFF, 0xFF, 0xFF, 0xFF};
1015

16+
/**
17+
* End-of-stream marker defined in the Arrow IPC specification:
18+
* https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
19+
*/
1120
constexpr std::array<uint8_t, 8> end_of_stream = {0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00};
1221

1322
template <std::ranges::input_range R>
1423
[[nodiscard]] bool is_continuation(const R& buf)
1524
{
1625
return std::ranges::equal(buf, continuation);
1726
}
27+
1828
template <std::ranges::input_range R>
1929
[[nodiscard]] bool is_end_of_stream(const R& buf)
2030
{

include/sparrow_ipc/metadata.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,20 @@
1010

1111
namespace sparrow_ipc
1212
{
13+
/**
14+
* @brief Converts FlatBuffers metadata to Sparrow metadata format.
15+
*
16+
* This function takes a FlatBuffers vector containing key-value pairs from Apache Arrow
17+
* format and converts them into a vector of Sparrow metadata pairs. Each key-value pair
18+
* from the FlatBuffers structure is extracted and stored as a sparrow::metadata_pair.
19+
*
20+
* @param metadata A FlatBuffers vector containing KeyValue pairs from Apache Arrow format
21+
* @return std::vector<sparrow::metadata_pair> A vector of Sparrow metadata pairs containing
22+
* the converted key-value data
23+
*
24+
* @note The function reserves space in the output vector to match the input size for
25+
* optimal memory allocation performance.
26+
*/
1327
std::vector<sparrow::metadata_pair> to_sparrow_metadata(
1428
const ::flatbuffers::Vector<::flatbuffers::Offset<org::apache::arrow::flatbuf::KeyValue>>& metadata
1529
);

src/arrow_interface/arrow_array.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@ namespace sparrow_ipc
1616
SPARROW_ASSERT_TRUE(array->release == std::addressof(release_non_owning_arrow_array))
1717

1818
release_common_non_owning_arrow(*array);
19-
if (array->private_data != nullptr)
20-
{
21-
const auto private_data = static_cast<non_owning_arrow_array_private_data*>(array->private_data);
22-
delete private_data;
23-
array->private_data = nullptr;
24-
}
2519
array->buffers = nullptr; // The buffers were deleted with the private data
2620
}
2721

src/arrow_interface/arrow_schema.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,6 @@ namespace sparrow_ipc
99
SPARROW_ASSERT_FALSE(schema == nullptr);
1010
SPARROW_ASSERT_TRUE(schema->release == std::addressof(release_non_owning_arrow_schema));
1111
release_common_non_owning_arrow(*schema);
12-
if (schema->private_data != nullptr)
13-
{
14-
const auto private_data = static_cast<non_owning_arrow_schema_private_data*>(schema->private_data);
15-
delete private_data;
16-
schema->private_data = nullptr;
17-
}
1812
*schema = {};
1913
}
2014
}

0 commit comments

Comments
 (0)