Skip to content

Commit a163900

Browse files
committed
Add owning_arrow_array_private_data
1 parent 413dffe commit a163900

File tree

5 files changed

+130
-47
lines changed

5 files changed

+130
-47
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ set(SPARROW_IPC_HEADERS
103103
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
104104
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
105105
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/compression.hpp
106+
#${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/decompressed_buffers.hpp
106107
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
107108
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_fixedsizebinary_array.hpp
108109
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_primitive_array.hpp
@@ -124,6 +125,7 @@ set(SPARROW_IPC_SRC
124125
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp
125126
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp
126127
${SPARROW_IPC_SOURCE_DIR}/compression.cpp
128+
#${SPARROW_IPC_SOURCE_DIR}/decompressed_buffers.cpp
127129
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
128130
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
129131
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp

include/sparrow_ipc/arrow_interface/arrow_array.hpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
#pragma once
32

43
#include <vector>
@@ -9,6 +8,16 @@
98

109
namespace sparrow_ipc
1110
{
11+
[[nodiscard]] SPARROW_IPC_API ArrowArray make_owning_arrow_array(
12+
int64_t length,
13+
int64_t null_count,
14+
int64_t offset,
15+
std::vector<std::vector<std::uint8_t>>&& buffers,
16+
size_t children_count,
17+
ArrowArray** children,
18+
ArrowArray* dictionary
19+
);
20+
1221
[[nodiscard]] SPARROW_IPC_API ArrowArray make_non_owning_arrow_array(
1322
int64_t length,
1423
int64_t null_count,

include/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,36 @@
77

88
namespace sparrow_ipc
99
{
10+
class owning_arrow_array_private_data
11+
{
12+
public:
13+
14+
explicit owning_arrow_array_private_data(std::vector<std::vector<std::uint8_t>>&& buffers)
15+
: m_buffers(std::move(buffers))
16+
{
17+
m_buffer_pointers.reserve(m_buffers.size());
18+
for (const auto& buffer : m_buffers)
19+
{
20+
m_buffer_pointers.push_back(buffer.data());
21+
}
22+
}
23+
24+
const void** buffers_ptrs() noexcept
25+
{
26+
return m_buffer_pointers.data();
27+
}
28+
29+
std::size_t n_buffers() const noexcept
30+
{
31+
return m_buffers.size();
32+
}
33+
34+
private:
35+
36+
std::vector<std::vector<std::uint8_t>> m_buffers;
37+
std::vector<const void*> m_buffer_pointers;
38+
};
39+
1040
class non_owning_arrow_array_private_data
1141
{
1242
public:

include/sparrow_ipc/deserialize_primitive_array.hpp

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,6 @@
1414

1515
namespace sparrow_ipc
1616
{
17-
namespace
18-
{
19-
struct DecompressedBuffers
20-
{
21-
std::vector<uint8_t> validity_buffer;
22-
std::vector<uint8_t> data_buffer;
23-
};
24-
25-
void release_decompressed_buffers(ArrowArray* array)
26-
{
27-
if (array->private_data)
28-
{
29-
delete static_cast<DecompressedBuffers*>(array->private_data);
30-
array->private_data = nullptr;
31-
}
32-
array->release = nullptr;
33-
}
34-
}
35-
3617
template <typename T>
3718
[[nodiscard]] sparrow::primitive_array<T> deserialize_non_owning_primitive_array(
3819
const org::apache::arrow::flatbuf::RecordBatch& record_batch,
@@ -43,19 +24,15 @@ namespace sparrow_ipc
4324
)
4425
{
4526
const auto compression = record_batch.compression();
46-
DecompressedBuffers* decompressed_buffers_owner = nullptr;
4727

4828
// Validity buffer
4929
const auto validity_buffer_metadata = record_batch.buffers()->Get(buffer_index++);
5030
auto validity_buffer_span = body.subspan(validity_buffer_metadata->offset(), validity_buffer_metadata->length());
31+
std::vector<std::vector<std::uint8_t>> decompressed_buffers;
5132
if (compression)
5233
{
53-
if (!decompressed_buffers_owner)
54-
{
55-
decompressed_buffers_owner = new DecompressedBuffers();
56-
}
57-
decompressed_buffers_owner->validity_buffer = decompress(compression->codec(), validity_buffer_span);
58-
validity_buffer_span = decompressed_buffers_owner->validity_buffer;
34+
decompressed_buffers.emplace_back(decompress(compression->codec(), validity_buffer_span));
35+
validity_buffer_span = decompressed_buffers.back();
5936
}
6037
auto bitmap_ptr = const_cast<uint8_t*>(validity_buffer_span.data());
6138
const sparrow::dynamic_bitset_view<const std::uint8_t> bitmap_view{
@@ -73,12 +50,8 @@ namespace sparrow_ipc
7350
auto data_buffer_span = body.subspan(primitive_buffer_metadata->offset(), primitive_buffer_metadata->length());
7451
if (compression)
7552
{
76-
if (!decompressed_buffers_owner)
77-
{
78-
decompressed_buffers_owner = new DecompressedBuffers();
79-
}
80-
decompressed_buffers_owner->data_buffer = decompress(compression->codec(), data_buffer_span);
81-
data_buffer_span = decompressed_buffers_owner->data_buffer;
53+
decompressed_buffers.emplace_back(decompress(compression->codec(), data_buffer_span));
54+
data_buffer_span = decompressed_buffers.back();
8255
}
8356
auto primitives_ptr = const_cast<uint8_t*>(data_buffer_span.data());
8457

@@ -95,24 +68,35 @@ namespace sparrow_ipc
9568
nullptr
9669
);
9770

98-
std::vector<std::uint8_t*> buffers = {bitmap_ptr, primitives_ptr};
99-
ArrowArray array = make_non_owning_arrow_array(
100-
record_batch.length(),
101-
null_count,
102-
0,
103-
std::move(buffers),
104-
0,
105-
nullptr,
106-
nullptr
107-
);
108-
109-
if (decompressed_buffers_owner)
71+
ArrowArray array;
72+
if (compression)
73+
{
74+
array = make_owning_arrow_array(
75+
record_batch.length(),
76+
null_count,
77+
0,
78+
std::move(decompressed_buffers),
79+
0,
80+
nullptr,
81+
nullptr
82+
);
83+
}
84+
else
11085
{
111-
array.private_data = decompressed_buffers_owner;
112-
array.release = release_decompressed_buffers;
86+
std::vector<std::uint8_t*> buffers = {bitmap_ptr, primitives_ptr};
87+
array = make_non_owning_arrow_array(
88+
record_batch.length(),
89+
null_count,
90+
0,
91+
std::move(buffers),
92+
0,
93+
nullptr,
94+
nullptr
95+
);
11396
}
11497

11598
sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
11699
return sparrow::primitive_array<T>{std::move(ap)};
117100
}
118101
}
102+

src/arrow_interface/arrow_array.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,64 @@
1010

1111
namespace sparrow_ipc
1212
{
13+
void release_owning_arrow_array(ArrowArray* array)
14+
{
15+
SPARROW_ASSERT_FALSE(array == nullptr)
16+
SPARROW_ASSERT_TRUE(array->release == std::addressof(release_owning_arrow_array))
17+
18+
if (array->private_data)
19+
{
20+
delete static_cast<owning_arrow_array_private_data*>(array->private_data);
21+
array->private_data = nullptr;
22+
}
23+
24+
for (int64_t i = 0; i < array->n_children; ++i)
25+
{
26+
if (array->children[i] && array->children[i]->release)
27+
{
28+
array->children[i]->release(array->children[i]);
29+
}
30+
}
31+
delete[] array->children;
32+
array->children = nullptr;
33+
34+
if (array->dictionary && array->dictionary->release)
35+
{
36+
array->dictionary->release(array->dictionary);
37+
}
38+
delete array->dictionary;
39+
array->dictionary = nullptr;
40+
41+
array->release = nullptr;
42+
}
43+
44+
ArrowArray make_owning_arrow_array(
45+
int64_t length,
46+
int64_t null_count,
47+
int64_t offset,
48+
std::vector<std::vector<std::uint8_t>>&& buffers,
49+
size_t children_count,
50+
ArrowArray** children,
51+
ArrowArray* dictionary
52+
)
53+
{
54+
ArrowArray array{};
55+
array.length = length;
56+
array.null_count = null_count;
57+
array.offset = offset;
58+
59+
auto private_data = new owning_arrow_array_private_data(std::move(buffers));
60+
array.private_data = private_data;
61+
array.n_buffers = private_data->n_buffers();
62+
array.buffers = private_data->buffers_ptrs();
63+
64+
array.n_children = static_cast<int64_t>(children_count);
65+
array.children = children;
66+
array.dictionary = dictionary;
67+
array.release = release_owning_arrow_array;
68+
return array;
69+
}
70+
1371
void release_non_owning_arrow_array(ArrowArray* array)
1472
{
1573
SPARROW_ASSERT_FALSE(array == nullptr)

0 commit comments

Comments
 (0)