Skip to content

Commit 45a42c8

Browse files
committed
wip
1 parent f82d723 commit 45a42c8

File tree

5 files changed

+51
-25
lines changed

5 files changed

+51
-25
lines changed

include/sparrow_ipc/deserialize_fixedsizebinary_array.hpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,24 @@ namespace sparrow_ipc
2323
const std::string format = "w:" + std::to_string(byte_width);
2424
ArrowSchema schema = make_arrow_schema(format, name.data(), metadata, std::nullopt, 0, nullptr, nullptr);
2525

26-
const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++);
27-
auto buffer_ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset());
28-
const size_t buffer_size = buffer_metadata->length();
29-
3026
const auto bitmap_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
31-
auto bitmap_ptr = const_cast<uint8_t*>(body.data() + bitmap_buffer_metadata->offset());
3227

33-
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{bitmap_ptr, static_cast<size_t>(record_batch.length())};
34-
std::vector<std::uint8_t*> buffers = {buffer_ptr, bitmap_ptr};
28+
uint8_t* bitmap_ptr = nullptr;
29+
int64_t null_count = 0;
30+
31+
// Check if validity buffer is present (length > 0 for nullable fields)
32+
if (bitmap_buffer_metadata->length() > 0) {
33+
bitmap_ptr = const_cast<uint8_t*>(body.data() + bitmap_buffer_metadata->offset());
34+
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{bitmap_ptr, static_cast<size_t>(record_batch.length())};
35+
null_count = bitmap_view.null_count();
36+
}
37+
38+
const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++);
39+
auto buffer_ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset());
40+
41+
std::vector<std::uint8_t*> buffers = {bitmap_ptr, buffer_ptr};
3542

36-
ArrowArray array = make_arrow_array(record_batch.length(), bitmap_view.null_count(), 0, std::move(buffers), 0, nullptr, nullptr);
43+
ArrowArray array = make_arrow_array(record_batch.length(), null_count, 0, std::move(buffers), 0, nullptr, nullptr);
3744

3845
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
3946
return sparrow::fixed_width_binary_array{std::move(ap)};

include/sparrow_ipc/deserialize_primitive_array.hpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,21 @@ namespace sparrow_ipc
2727
ArrowSchema schema = make_arrow_schema(format, name.data(), metadata, std::nullopt, 0, nullptr, nullptr);
2828

2929
const auto bitmap_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
30-
auto bitmap_ptr = const_cast<uint8_t*>(body.data() + bitmap_buffer_metadata->offset());
30+
uint8_t* bitmap_ptr = nullptr;
31+
int64_t null_count = 0;
32+
33+
// Check if validity buffer is present (length > 0 for nullable fields)
34+
if (bitmap_buffer_metadata->length() > 0) {
35+
bitmap_ptr = const_cast<uint8_t*>(body.data() + bitmap_buffer_metadata->offset());
36+
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{bitmap_ptr, static_cast<size_t>(record_batch.length())};
37+
null_count = bitmap_view.null_count();
38+
}
3139

3240
const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
3341
auto primitives_ptr = const_cast<uint8_t*>(body.data() + primitive_buffer_metadata->offset());
3442

35-
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{bitmap_ptr, static_cast<size_t>(record_batch.length())};
3643
std::vector<std::uint8_t*> buffers = {bitmap_ptr, primitives_ptr};
37-
ArrowArray array = make_arrow_array(record_batch.length(), bitmap_view.null_count(), 0, std::move(buffers), 0, nullptr, nullptr);
44+
ArrowArray array = make_arrow_array(record_batch.length(), null_count, 0, std::move(buffers), 0, nullptr, nullptr);
3845

3946
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
4047
return sparrow::primitive_array<T>{std::move(ap)};

include/sparrow_ipc/deserialize_variable_size_binary_array.hpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,24 @@ namespace sparrow_ipc
2626
ArrowSchema schema = make_arrow_schema(format, name.data(), metadata, std::nullopt, 0, nullptr, nullptr);
2727

2828
const auto bitmap_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
29-
auto bitmap_ptr = const_cast<uint8_t*>(body.data() + bitmap_buffer_metadata->offset());
29+
uint8_t* bitmap_ptr = nullptr;
30+
int64_t null_count = 0;
31+
32+
// Check if validity buffer is present (length > 0 for nullable fields)
33+
if (bitmap_buffer_metadata->length() > 0) {
34+
bitmap_ptr = const_cast<uint8_t*>(body.data() + bitmap_buffer_metadata->offset());
35+
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{bitmap_ptr, static_cast<size_t>(record_batch.length())};
36+
null_count = bitmap_view.null_count();
37+
}
3038

3139
const auto offset_metadata = record_batch.buffers()->Get(buffer_index++);
3240
auto offset_ptr = const_cast<uint8_t*>(body.data() + offset_metadata->offset());
33-
const size_t offset_size = offset_metadata->length();
3441

3542
const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++);
3643
auto buffer_ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset());
37-
const size_t buffer_size = buffer_metadata->length();
3844

39-
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{bitmap_ptr, static_cast<size_t>(record_batch.length())};
4045
std::vector<std::uint8_t*> buffers = {bitmap_ptr, offset_ptr, buffer_ptr};
41-
ArrowArray array = make_arrow_array(record_batch.length(), bitmap_view.null_count(), 0, std::move(buffers), 0, nullptr, nullptr);
46+
ArrowArray array = make_arrow_array(record_batch.length(), null_count, 0, std::move(buffers), 0, nullptr, nullptr);
4247

4348
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
4449
return T{std::move(ap)};

src/encapsulated_message.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,19 @@ namespace sparrow_ipc
7272

7373
std::span<const uint8_t> EncapsulatedMessage::body() const
7474
{
75-
const uint8_t* body_ptr = m_buf_ptr + (sizeof(uint32_t) * 2) // 4 bytes continuation + 4 bytes
76-
// metadata size
77-
+ metadata_length();
75+
const size_t offset = sizeof(uint32_t) * 2 // 4 bytes continuation + 4 bytes metadata size
76+
+ metadata_length();
77+
const size_t padded_offset = (offset + 7) & ~7; // Round up to 8-byte boundary
78+
const uint8_t* body_ptr = m_buf_ptr + padded_offset;
7879
return {body_ptr, body_length()};
7980
}
8081

8182
size_t EncapsulatedMessage::total_length() const
8283
{
83-
return sizeof(uint32_t) * 2 // 4 bytes continuation + 4 bytes metadata size
84-
+ metadata_length() + body_length();
84+
const size_t offset = sizeof(uint32_t) * 2 // 4 bytes continuation + 4 bytes metadata size
85+
+ metadata_length();
86+
const size_t padded_offset = (offset + 7) & ~7; // Round up to 8-byte boundary
87+
return padded_offset + body_length();
8588
}
8689

8790
std::span<const uint8_t> EncapsulatedMessage::as_span() const

tests/test_primitive_array_with_files.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ const std::filesystem::path tests_resources_files_path = TESTS_RESOURCES_FILES_P
1919

2020
const std::vector<std::filesystem::path> files_paths_to_test = {
2121
tests_resources_files_path / "generated_primitive",
22+
tests_resources_files_path / "generated_primitive_large_offsets",
23+
tests_resources_files_path / "generated_primitive_zerolength",
24+
tests_resources_files_path / "generated_primitive_no_batches"
2225
};
2326

2427
size_t get_number_of_batches(const std::filesystem::path& json_path)
@@ -42,15 +45,15 @@ nlohmann::json load_json_file(const std::filesystem::path& json_path)
4245
return nlohmann::json::parse(json_file);
4346
}
4447

45-
TEST_SUITE("integration tests")
48+
TEST_SUITE("Integration tests")
4649
{
47-
TEST_CASE("POUET")
50+
TEST_CASE("Compare stream deserialization with JSON deserialization")
4851
{
4952
for (const auto& file_path : files_paths_to_test)
5053
{
5154
std::filesystem::path json_path = file_path;
5255
json_path.replace_extension(".json");
53-
const std::string test_name = "Testing " + json_path.filename().string();
56+
const std::string test_name = "Testing " + file_path.filename().string();
5457
SUBCASE(test_name.c_str())
5558
{
5659
// Load the JSON file
@@ -91,7 +94,8 @@ TEST_SUITE("integration tests")
9194
{
9295
for(size_t z = 0 ; z < record_batches_from_stream[i].get_column(y).size(); z++)
9396
{
94-
INFO("Comparing batch " << i << ", column " << y << ", row " << z);
97+
const auto col_name = record_batches_from_stream[i].get_column(y).name().value_or("NA");
98+
INFO("Comparing batch " << i << ", column " << y << " named :"<< col_name <<" , row " << z);
9599
REQUIRE_EQ(record_batches_from_stream[i].get_column(y).size(), record_batches_from_json[i].get_column(y).size());
96100
CHECK_EQ(record_batches_from_stream[i].get_column(y).at(z), record_batches_from_json[i].get_column(y).at(z));
97101
}

0 commit comments

Comments
 (0)