Skip to content

Commit 6eb449e

Browse files
committed
Factorize
1 parent a163900 commit 6eb449e

File tree

9 files changed

+226
-281
lines changed

9 files changed

+226
-281
lines changed
Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,86 @@
11
#pragma once
22

3-
#include <vector>
3+
#include <utility>
44

55
#include <sparrow/c_interface.hpp>
6+
#include <sparrow/utils/contracts.hpp>
67

78
#include "sparrow_ipc/config/config.hpp"
9+
#include "sparrow_ipc/arrow_interface/arrow_array/private_data.hpp"
810

911
namespace sparrow_ipc
1012
{
11-
[[nodiscard]] SPARROW_IPC_API ArrowArray make_owning_arrow_array(
12-
int64_t length,
13-
int64_t null_count,
14-
int64_t offset,
15-
std::vector<std::vector<std::uint8_t>>&& buffers,
16-
size_t children_count,
17-
ArrowArray** children,
18-
ArrowArray* dictionary
19-
);
13+
SPARROW_IPC_API void release_arrow_array_children_and_dictionary(ArrowArray* array);
14+
15+
template <ArrowPrivateData T>
16+
void arrow_array_release(ArrowArray* array)
17+
{
18+
SPARROW_ASSERT_TRUE(array != nullptr)
19+
SPARROW_ASSERT_TRUE(array->release == std::addressof(arrow_array_release<T>))
20+
21+
SPARROW_ASSERT_TRUE(array->private_data != nullptr);
2022

21-
[[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array(
23+
delete static_cast<T*>(array->private_data);
24+
array->private_data = nullptr;
25+
array->buffers = nullptr; // The buffers were deleted with the private data
26+
27+
release_arrow_array_children_and_dictionary(array);
28+
array->release = nullptr;
29+
}
30+
31+
template <ArrowPrivateData T, typename Arg>
32+
SPARROW_IPC_API void fill_arrow_array(
33+
ArrowArray& array,
2234
int64_t length,
2335
int64_t null_count,
2436
int64_t offset,
25-
std::vector<std::uint8_t*>&& buffers,
2637
size_t children_count,
2738
ArrowArray** children,
28-
ArrowArray* dictionary
29-
);
39+
ArrowArray* dictionary,
40+
Arg&& private_data_arg
41+
)
42+
{
43+
SPARROW_ASSERT_TRUE(length >= 0);
44+
SPARROW_ASSERT_TRUE(null_count >= -1);
45+
SPARROW_ASSERT_TRUE(offset >= 0);
3046

31-
SPARROW_IPC_API void release_non_owning_arrow_array(ArrowArray* array);
47+
array.length = length;
48+
array.null_count = null_count;
49+
array.offset = offset;
50+
array.n_children = static_cast<int64_t>(children_count);
51+
array.children = children;
52+
array.dictionary = dictionary;
3253

33-
SPARROW_IPC_API void fill_non_owning_arrow_array(
34-
ArrowArray& array,
54+
auto private_data = new T(std::forward<Arg>(private_data_arg));
55+
array.private_data = private_data;
56+
array.n_buffers = private_data->n_buffers();
57+
array.buffers = private_data->buffers_ptrs();
58+
59+
array.release = &arrow_array_release<T>;
60+
}
61+
62+
template <ArrowPrivateData T, typename Arg>
63+
[[nodiscard]] SPARROW_IPC_API ArrowArray make_arrow_array(
3564
int64_t length,
3665
int64_t null_count,
3766
int64_t offset,
38-
std::vector<std::uint8_t*>&& buffers,
3967
size_t children_count,
4068
ArrowArray** children,
41-
ArrowArray* dictionary
42-
);
43-
}
69+
ArrowArray* dictionary,
70+
Arg&& private_data_arg
71+
)
72+
{
73+
ArrowArray array{};
74+
fill_arrow_array<T>(
75+
array,
76+
length,
77+
null_count,
78+
offset,
79+
children_count,
80+
children,
81+
dictionary,
82+
std::forward<Arg>(private_data_arg)
83+
);
84+
return array;
85+
}
86+
}

include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
#pragma once
2-
2+
#include <concepts>
33
#include <cstdint>
44
#include <vector>
55

66
#include "sparrow_ipc/config/config.hpp"
77

88
namespace sparrow_ipc
99
{
10+
template <typename T>
11+
concept ArrowPrivateData = requires(T& t)
12+
{
13+
{ t.buffers_ptrs() } -> std::same_as<const void**>;
14+
{ t.n_buffers() } -> std::convertible_to<std::size_t>;
15+
};
16+
1017
class owning_arrow_array_private_data
1118
{
1219
public:
@@ -21,12 +28,12 @@ namespace sparrow_ipc
2128
}
2229
}
2330

24-
const void** buffers_ptrs() noexcept
31+
[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept
2532
{
2633
return m_buffer_pointers.data();
2734
}
2835

29-
std::size_t n_buffers() const noexcept
36+
[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept
3037
{
3138
return m_buffers.size();
3239
}
@@ -42,14 +49,22 @@ namespace sparrow_ipc
4249
public:
4350

4451
explicit constexpr non_owning_arrow_array_private_data(std::vector<std::uint8_t*>&& buffers_pointers)
45-
: m_buffers_pointers(std::move(buffers_pointers))
52+
: m_buffer_pointers(std::move(buffers_pointers))
4653
{
4754
}
4855

49-
[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept;
56+
[[nodiscard]] SPARROW_IPC_API const void** buffers_ptrs() noexcept
57+
{
58+
return const_cast<const void**>(reinterpret_cast<void**>(m_buffer_pointers.data()));
59+
}
60+
61+
[[nodiscard]] SPARROW_IPC_API std::size_t n_buffers() const noexcept
62+
{
63+
return m_buffer_pointers.size();
64+
}
5065

5166
private:
5267

53-
std::vector<std::uint8_t*> m_buffers_pointers;
68+
std::vector<std::uint8_t*> m_buffer_pointers;
5469
};
5570
}

include/sparrow_ipc/deserialize_primitive_array.hpp

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include "Message_generated.h"
1010
#include "sparrow_ipc/arrow_interface/arrow_array.hpp"
1111
#include "sparrow_ipc/arrow_interface/arrow_schema.hpp"
12-
#include "sparrow_ipc/compression.hpp"
1312
#include "sparrow_ipc/deserialize_utils.hpp"
1413

1514
namespace sparrow_ipc
@@ -23,38 +22,6 @@ namespace sparrow_ipc
2322
size_t& buffer_index
2423
)
2524
{
26-
const auto compression = record_batch.compression();
27-
28-
// Validity buffer
29-
const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
30-
auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length());
31-
std::vector<std::vector<std::uint8_t>> decompressed_buffers;
32-
if (compression)
33-
{
34-
decompressed_buffers.emplace_back(decompress(compression->codec(), validity_buffer_span));
35-
validity_buffer_span = decompressed_buffers.back();
36-
}
37-
auto bitmap_ptr = const_cast<uint8_t*>(validity_buffer_span.data());
38-
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{
39-
bitmap_ptr,
40-
static_cast<size_t>(record_batch.length())};
41-
auto null_count = bitmap_view.null_count();
42-
if (validity_buffer_metadata->length() == 0)
43-
{
44-
bitmap_ptr = nullptr;
45-
null_count = 0;
46-
}
47-
48-
// Data buffer
49-
const auto primitive_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
50-
auto data_buffer_span = body.subspan(primitive_buffer_metadata->offset(), primitive_buffer_metadata->length());
51-
if (compression)
52-
{
53-
decompressed_buffers.emplace_back(decompress(compression->codec(), data_buffer_span));
54-
data_buffer_span = decompressed_buffers.back();
55-
}
56-
auto primitives_ptr = const_cast<uint8_t*>(data_buffer_span.data());
57-
5825
const std::string_view format = data_type_to_format(
5926
sparrow::detail::get_data_type_from_array<sparrow::primitive_array<T>>::get()
6027
);
@@ -68,35 +35,54 @@ namespace sparrow_ipc
6835
nullptr
6936
);
7037

38+
const auto compression = record_batch.compression();
39+
std::vector<std::vector<std::uint8_t>> decompressed_buffers;
40+
41+
auto validity_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
42+
43+
uint8_t* bitmap_ptr = nullptr;
44+
int64_t null_count = 0;
45+
46+
if (validity_buffer_span.size() > 0)
47+
{
48+
bitmap_ptr = const_cast<uint8_t*>(validity_buffer_span.data());
49+
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{
50+
bitmap_ptr,
51+
static_cast<size_t>(record_batch.length())};
52+
null_count = bitmap_view.null_count();
53+
}
54+
55+
auto data_buffer_span = utils::get_and_decompress_buffer(record_batch, body, buffer_index, compression, decompressed_buffers);
56+
7157
ArrowArray array;
7258
if (compression)
7359
{
74-
array = make_owning_arrow_array(
60+
array = make_arrow_array<owning_arrow_array_private_data>(
7561
record_batch.length(),
7662
null_count,
7763
0,
78-
std::move(decompressed_buffers),
7964
0,
8065
nullptr,
81-
nullptr
66+
nullptr,
67+
std::move(decompressed_buffers)
8268
);
8369
}
8470
else
8571
{
72+
auto primitives_ptr = const_cast<uint8_t*>(data_buffer_span.data());
8673
std::vector<std::uint8_t*> buffers = {bitmap_ptr, primitives_ptr};
87-
array = make_non_owning_arrow_array(
74+
array = make_arrow_array<non_owning_arrow_array_private_data>(
8875
record_batch.length(),
8976
null_count,
9077
0,
91-
std::move(buffers),
9278
0,
9379
nullptr,
94-
nullptr
80+
nullptr,
81+
std::move(buffers)
9582
);
9683
}
9784

9885
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
9986
return sparrow::primitive_array<T>{std::move(ap)};
10087
}
10188
}
102-

include/sparrow_ipc/deserialize_utils.hpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
#include <span>
44
#include <utility>
5+
#include <vector>
56

67
#include <sparrow/buffer/dynamic_bitset/dynamic_bitset_view.hpp>
78
#include <sparrow/u8_buffer.hpp>
89

910
#include "Message_generated.h"
10-
#include "Schema_generated.h"
1111

1212
namespace sparrow_ipc::utils
1313
{
@@ -33,4 +33,29 @@ namespace sparrow_ipc::utils
3333
std::span<const uint8_t> body,
3434
size_t index
3535
);
36-
}
36+
37+
/**
38+
* @brief Extracts a buffer from a RecordBatch and decompresses it if necessary.
39+
*
40+
* This function retrieves a buffer span from the specified index, increments the index,
41+
* and applies decompression if specified. If the buffer is decompressed, the new
42+
* data is stored in `decompressed_storage` and the returned span will point to this new data.
43+
*
44+
* @param record_batch The Arrow RecordBatch containing buffer metadata.
45+
* @param body The raw buffer data as a byte span.
46+
* @param buffer_index The index of the buffer to retrieve. This value is incremented by the function.
47+
* @param compression The compression algorithm to use. If nullptr, no decompression is performed.
48+
* @param decompressed_storage A vector that will be used to store the data of any decompressed buffers.
49+
*
50+
* @return A span viewing the resulting buffer data. This will be a view of the original
51+
* `body` if no decompression occurs, or a view of the newly added buffer in
52+
* `decompressed_storage` if decompression occurs.
53+
*/
54+
[[nodiscard]] std::span<const uint8_t> get_and_decompress_buffer(
55+
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
56+
std::span<const uint8_t> body,
57+
size_t& buffer_index,
58+
const org::apache::arrow::flatbuf::BodyCompression* compression,
59+
std::vector<std::vector<uint8_t>>& decompressed_storage
60+
);
61+
}

0 commit comments

Comments
 (0)