Skip to content

Commit 35ebb09

Browse files
committed
Simplify deserialization
1 parent b5cc681 commit 35ebb09

File tree

8 files changed

+137
-146
lines changed

8 files changed

+137
-146
lines changed
Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22
#include <concepts>
33
#include <cstdint>
4+
#include <span>
5+
#include <variant>
46
#include <vector>
57

68
#include "sparrow_ipc/config/config.hpp"
@@ -14,33 +16,18 @@ namespace sparrow_ipc
1416
{ t.n_buffers() } -> std::convertible_to<std::size_t>;
1517
};
1618

17-
class owning_arrow_array_private_data
19+
class arrow_array_private_data
1820
{
1921
public:
20-
21-
explicit owning_arrow_array_private_data(std::vector<std::vector<std::uint8_t>>&& buffers);
22+
using optionally_owned_buffer = std::variant<std::vector<uint8_t>, std::span<const uint8_t>>;
23+
explicit arrow_array_private_data(std::vector<optionally_owned_buffer>&& buffers);
2224

2325
[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept;
2426
[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept;
2527

2628
private:
27-
std::vector<std::vector<std::uint8_t>> m_buffers;
28-
std::vector<const void*> m_buffer_pointers;
29-
};
30-
31-
class non_owning_arrow_array_private_data
32-
{
33-
public:
3429

35-
explicit constexpr non_owning_arrow_array_private_data(std::vector<std::uint8_t*>&& buffers_pointers)
36-
: m_buffer_pointers(std::move(buffers_pointers))
37-
{
38-
}
39-
40-
[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept;
41-
[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept;
42-
43-
private:
44-
std::vector<std::uint8_t*> m_buffer_pointers;
30+
std::vector<optionally_owned_buffer> m_buffers;
31+
std::vector<const void*> m_buffer_pointers;
4532
};
4633
}

include/sparrow_ipc/arrow_interface/arrow_array_schema_common_release.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace sparrow_ipc
2020
{
2121
using private_data_type = std::conditional_t<
2222
std::same_as<T, ArrowArray>,
23-
non_owning_arrow_array_private_data,
23+
arrow_array_private_data,
2424
non_owning_arrow_schema_private_data>;
2525
if (t.release == nullptr)
2626
{

include/sparrow_ipc/deserialize_primitive_array.hpp

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,43 +36,46 @@ namespace sparrow_ipc
3636
);
3737

3838
const auto compression = record_batch.compression();
39-
std::vector<std::vector<std::uint8_t>> decompressed_buffers;
40-
41-
// TODO do not decompress validity buffers?
42-
auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
43-
44-
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
45-
46-
auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
47-
48-
ArrowArray array;
39+
std::vector<arrow_array_private_data::optionally_owned_buffer> buffers;
40+
// TODO do not de/compress validity buffers?
4941
if (compression)
5042
{
51-
array = make_arrow_array<owning_arrow_array_private_data>(
52-
record_batch.length(),
53-
null_count,
54-
0,
55-
0,
56-
nullptr,
57-
nullptr,
58-
std::move(decompressed_buffers)
59-
);
43+
// Validity buffer
44+
buffers.push_back(utils::get_decompressed_buffer(record_batch, body, buffer_index, compression));
45+
// Data buffer
46+
buffers.push_back(utils::get_decompressed_buffer(record_batch, body, buffer_index, compression));
6047
}
6148
else
6249
{
63-
auto primitives_ptr = const_cast<uint8_t*>(data_buffer_span.data());
64-
std::vector<std::uint8_t*> buffers = {bitmap_ptr, primitives_ptr};
65-
array = make_arrow_array<non_owning_arrow_array_private_data>(
66-
record_batch.length(),
67-
null_count,
68-
0,
69-
0,
70-
nullptr,
71-
nullptr,
72-
std::move(buffers)
73-
);
50+
// Validity buffer
51+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
52+
// Data buffer
53+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
7454
}
7555

56+
std::span<const uint8_t> validity_buffer_span;
57+
if (std::holds_alternative<std::span<const uint8_t>>(buffers[0]))
58+
{
59+
validity_buffer_span = std::get<std::span<const uint8_t>>(buffers[0]);
60+
}
61+
else
62+
{
63+
validity_buffer_span = std::span<const uint8_t>(std::get<std::vector<uint8_t>>(buffers[0]));
64+
}
65+
66+
// TODO bitmap_ptr is not used anymore... Leave it for now, and remove later if no need confirmed
67+
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
68+
69+
ArrowArray array = make_arrow_array<arrow_array_private_data>(
70+
record_batch.length(),
71+
null_count,
72+
0,
73+
0,
74+
nullptr,
75+
nullptr,
76+
std::move(buffers)
77+
);
78+
7679
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
7780
return sparrow::primitive_array<T>{std::move(ap)};
7881
}

include/sparrow_ipc/deserialize_utils.hpp

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,27 @@ namespace sparrow_ipc::utils
5959
* @brief Extracts a buffer from a RecordBatch and decompresses it if necessary.
6060
*
6161
* This function retrieves a buffer span from the specified index, increments the index,
62-
* and applies decompression if specified. If the buffer is decompressed, the new
63-
* data is stored in `decompressed_storage` and the returned span will point to this new data.
62+
* and applies decompression if specified.
6463
*
6564
* @param record_batch The Arrow RecordBatch containing buffer metadata.
6665
* @param body The raw buffer data as a byte span.
6766
* @param buffer_index The index of the buffer to retrieve. This value is incremented by the function.
6867
* @param compression The compression algorithm to use. If nullptr, no decompression is performed.
69-
* @param decompressed_storage A vector that will be used to store the data of any decompressed buffers.
7068
*
71-
* @return A span viewing the resulting buffer data. This will be a view of the original
72-
* `body` if no decompression occurs, or a view of the newly added buffer in
73-
* `decompressed_storage` if decompression occurs.
69+
* @return A `std::variant` containing either:
70+
* - A `std::vector<std::uint8_t>` if the buffer was decompressed, owning the newly allocated data.
71+
* - A `std::span<const std::uint8_t>` if no decompression occurred, providing a view of the original `body`.
7472
*/
75-
[[nodiscard]] std::span<const uint8_t> get_and_decompress_buffer(
73+
[[nodiscard]] std::variant<std::vector<std::uint8_t>, std::span<const std::uint8_t>> get_decompressed_buffer(
7674
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
7775
std::span<const uint8_t> body,
7876
size_t& buffer_index,
79-
const org::apache::arrow::flatbuf::BodyCompression* compression,
80-
std::vector<std::vector<uint8_t>>& decompressed_storage
77+
const org::apache::arrow::flatbuf::BodyCompression* compression
78+
);
79+
80+
[[nodiscard]] std::span<const uint8_t> get_buffer(
81+
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
82+
std::span<const uint8_t> body,
83+
size_t& buffer_index
8184
);
8285
}

include/sparrow_ipc/deserialize_variable_size_binary_array.hpp

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,44 +33,49 @@ namespace sparrow_ipc
3333
);
3434

3535
const auto compression = record_batch.compression();
36-
std::vector<std::vector<std::uint8_t>> decompressed_buffers;
37-
38-
auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
39-
40-
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
41-
42-
auto offset_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
43-
auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
44-
45-
ArrowArray array;
36+
std::vector<arrow_array_private_data::optionally_owned_buffer> buffers;
4637
if (compression)
4738
{
48-
array = make_arrow_array<owning_arrow_array_private_data>(
49-
record_batch.length(),
50-
null_count,
51-
0,
52-
0,
53-
nullptr,
54-
nullptr,
55-
std::move(decompressed_buffers)
56-
);
39+
// Validity buffer
40+
buffers.push_back(utils::get_decompressed_buffer(record_batch, body, buffer_index, compression));
41+
// Offset buffer
42+
buffers.push_back(utils::get_decompressed_buffer(record_batch, body, buffer_index, compression));
43+
// Data buffer
44+
buffers.push_back(utils::get_decompressed_buffer(record_batch, body, buffer_index, compression));
5745
}
5846
else
5947
{
60-
auto offset_ptr = const_cast<uint8_t*>(offset_buffer_span.data());
61-
auto buffer_ptr = const_cast<uint8_t*>(data_buffer_span.data());
62-
std::vector<std::uint8_t*> buffers = {bitmap_ptr, offset_ptr, buffer_ptr};
63-
array = make_arrow_array<non_owning_arrow_array_private_data>(
64-
record_batch.length(),
65-
null_count,
66-
0,
67-
0,
68-
nullptr,
69-
nullptr,
70-
std::move(buffers)
71-
);
48+
// Validity buffer
49+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
50+
// Offset buffer
51+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
52+
// Data buffer
53+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
7254
}
7355

56+
std::span<const uint8_t> validity_buffer_span;
57+
if (std::holds_alternative<std::span<const uint8_t>>(buffers[0]))
58+
{
59+
validity_buffer_span = std::get<std::span<const uint8_t>>(buffers[0]);
60+
}
61+
else
62+
{
63+
validity_buffer_span = std::span<const uint8_t>(std::get<std::vector<uint8_t>>(buffers[0]));
64+
}
65+
66+
// TODO bitmap_ptr is not used anymore... Leave it for now, and remove later if no need confirmed
67+
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
68+
69+
ArrowArray array = make_arrow_array<arrow_array_private_data>(
70+
record_batch.length(),
71+
null_count,
72+
0,
73+
0,
74+
nullptr,
75+
nullptr,
76+
std::move(buffers)
77+
);
78+
7479
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
7580
return T{std::move(ap)};
7681
}

src/arrow_interface/arrow_array/private_data.cpp

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,36 @@
22

33
namespace sparrow_ipc
44
{
5-
owning_arrow_array_private_data::owning_arrow_array_private_data(std::vector<std::vector<std::uint8_t>>&& buffers)
5+
arrow_array_private_data::arrow_array_private_data(std::vector<optionally_owned_buffer>&& buffers)
66
: m_buffers(std::move(buffers))
77
{
88
m_buffer_pointers.reserve(m_buffers.size());
99
for (const auto& buffer : m_buffers)
1010
{
11-
m_buffer_pointers.push_back(buffer.data());
11+
std::visit(
12+
[this](auto&& arg)
13+
{
14+
if (arg.empty())
15+
{
16+
m_buffer_pointers.push_back(nullptr);
17+
}
18+
else
19+
{
20+
m_buffer_pointers.push_back(arg.data());
21+
}
22+
},
23+
buffer
24+
);
1225
}
1326
}
1427

15-
const void** owning_arrow_array_private_data::buffers_ptrs() noexcept
28+
const void** arrow_array_private_data::buffers_ptrs() noexcept
1629
{
1730
return m_buffer_pointers.data();
1831
}
1932

20-
std::size_t owning_arrow_array_private_data::n_buffers() const noexcept
33+
std::size_t arrow_array_private_data::n_buffers() const noexcept
2134
{
2235
return m_buffers.size();
2336
}
24-
25-
const void** non_owning_arrow_array_private_data::buffers_ptrs() noexcept
26-
{
27-
return const_cast<const void**>(reinterpret_cast<void**>(m_buffer_pointers.data()));
28-
}
29-
30-
std::size_t non_owning_arrow_array_private_data::n_buffers() const noexcept
31-
{
32-
return m_buffer_pointers.size();
33-
}
3437
}

src/deserialize_fixedsizebinary_array.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ namespace sparrow_ipc
2222
nullptr,
2323
nullptr
2424
);
25-
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(
26-
record_batch,
27-
body,
28-
buffer_index++
29-
);
30-
const auto buffer_metadata = record_batch.buffers()->Get(buffer_index++);
31-
if ((body.size() < (buffer_metadata->offset() + buffer_metadata->length())))
32-
{
33-
throw std::runtime_error("Data buffer exceeds body size");
34-
}
35-
auto buffer_ptr = const_cast<uint8_t*>(body.data() + buffer_metadata->offset());
36-
std::vector<std::uint8_t*> buffers = {bitmap_ptr, buffer_ptr};
37-
ArrowArray array = make_arrow_array<non_owning_arrow_array_private_data>(
25+
26+
std::vector<arrow_array_private_data::optionally_owned_buffer> buffers;
27+
// Validity buffer
28+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
29+
// Data buffer
30+
buffers.push_back(utils::get_buffer(record_batch, body, buffer_index));
31+
32+
auto validity_buffer_span = std::get<std::span<const uint8_t>>(buffers[0]);
33+
34+
// TODO bitmap_ptr is not used anymore... Leave it for now, and remove later if no need confirmed
35+
const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(validity_buffer_span, record_batch.length());
36+
37+
ArrowArray array = make_arrow_array<arrow_array_private_data>(
3838
record_batch.length(),
3939
null_count,
4040
0,

0 commit comments

Comments
 (0)