Skip to content

Commit 0762a66

Browse files
Use global buffer for each column buffers when translating to arrow
1 parent 428bf0e commit 0762a66

File tree

2 files changed

+128
-85
lines changed

2 files changed

+128
-85
lines changed

libtiledbsoma/src/utils/arrow_adapter.cc

Lines changed: 122 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -116,46 +116,74 @@ std::pair<enum ArrowType, enum ArrowTimeUnit> to_nanoarrow_time(std::string_view
116116
}
117117

118118
ArrowBuffer::ArrowBuffer(ReadColumnBuffer* buffer, bool large_offsets) {
119+
// Compute total size for all buffers as well the offsets for each section within each buffer
120+
// Each buffer section should be 64-bit aligned
121+
122+
size_t data_byte_size = 0;
123+
size_t offset_byte_size = 0;
124+
size_t validity_byte_size = 0;
125+
126+
size_t offsets_byte_offset = 0;
127+
size_t validity_byte_offset = 0;
128+
119129
if (buffer->is_var()) {
120-
size_t data_byte_size = buffer->offsets()[buffer->size()];
121-
data_ = std::make_unique_for_overwrite<std::byte[]>(data_byte_size);
130+
data_byte_size = buffer->offsets()[buffer->size()];
122131

123132
if (large_offsets) {
124-
size_t offset_byte_size = (buffer->size() + 1) * sizeof(int64_t);
125-
large_offsets_ = std::make_unique_for_overwrite<int64_t[]>(buffer->size() + 1);
126-
std::memcpy(large_offsets_.get(), buffer->offsets().data(), offset_byte_size);
133+
offset_byte_size = (buffer->size() + 1) * sizeof(int64_t);
127134
} else {
128-
small_offsets_ = std::make_unique_for_overwrite<int32_t[]>(buffer->is_var() + 1);
135+
offset_byte_size = (buffer->size() + 1) * sizeof(int32_t);
136+
}
137+
138+
offsets_byte_offset = ((data_byte_size + 7) / 8) * 8;
139+
validity_byte_offset = ((offsets_byte_offset + offset_byte_size + 7) / 8) * 8;
140+
} else {
141+
if (buffer->type() == TILEDB_BOOL) {
142+
data_byte_size = (buffer->size() + 7) / 8;
143+
} else {
144+
data_byte_size = buffer->size() * tiledb::impl::type_size(buffer->type());
145+
}
146+
147+
validity_byte_offset = ((data_byte_size + 7) / 8) * 8;
148+
}
149+
150+
if (buffer->is_nullable()) {
151+
validity_byte_size = (buffer->size() + 7) / 8;
152+
}
153+
154+
// Allocate global buffer
155+
global_buffer_ = std::make_unique_for_overwrite<std::byte[]>(validity_byte_offset + validity_byte_size);
156+
157+
data_ = std::span<std::byte>(global_buffer_.get(), data_byte_size);
158+
159+
if (buffer->is_var()) {
160+
if (large_offsets) {
161+
large_offsets_ = std::span<int64_t>(
162+
reinterpret_cast<int64_t*>(global_buffer_.get() + offsets_byte_offset), buffer->size() + 1);
163+
std::memcpy(large_offsets_.data(), buffer->offsets().data(), offset_byte_size);
164+
} else {
165+
small_offsets_ = std::span<int32_t>(
166+
reinterpret_cast<int32_t*>(global_buffer_.get() + offsets_byte_offset), buffer->size() + 1);
129167
auto offsets = buffer->offsets();
130168
for (size_t i = 0; i < offsets.size(); ++i) {
131169
small_offsets_[i] = static_cast<int32_t>(offsets[i]);
132170
}
133171
}
134172

135-
std::memcpy(data_.get(), buffer->data().data(), data_byte_size);
173+
std::memcpy(data_.data(), buffer->data().data(), data_byte_size);
136174
} else {
137175
if (buffer->type() == TILEDB_BOOL) {
138-
size_t data_byte_size = (buffer->size() + 7) / 8;
139-
140-
data_ = std::make_unique_for_overwrite<std::byte[]>(data_byte_size);
141-
buffer->data_to_bitmap();
142-
143-
std::memcpy(data_.get(), buffer->data().data(), data_byte_size);
176+
ColumnBuffer::to_bitmap(
177+
buffer->data<uint8_t>(), std::span<uint8_t>(reinterpret_cast<uint8_t*>(data_.data()), data_.size()));
144178
} else {
145-
size_t data_byte_size = buffer->size() * tiledb::impl::type_size(buffer->type());
146-
147-
data_ = std::make_unique_for_overwrite<std::byte[]>(data_byte_size);
148-
149-
std::memcpy(data_.get(), buffer->data().data(), data_byte_size);
179+
std::memcpy(data_.data(), buffer->data().data(), data_byte_size);
150180
}
151181
}
152182

153183
if (buffer->is_nullable()) {
154-
buffer->validity_to_bitmap();
155-
auto bitmap_size = (buffer->size() + 7) / 8;
156-
157-
validity_ = std::make_unique_for_overwrite<std::byte[]>(bitmap_size);
158-
std::memcpy(validity_.get(), buffer->validity().data(), bitmap_size);
184+
validity_ = std::span<uint8_t>(
185+
reinterpret_cast<uint8_t*>(global_buffer_.get() + validity_byte_offset), validity_byte_size);
186+
buffer->validity_to_bitmap(validity_);
159187
}
160188

161189
length = buffer->size();
@@ -165,12 +193,15 @@ ArrowBuffer::ArrowBuffer(ReadColumnBuffer* buffer, bool large_offsets) {
165193
ArrowBuffer::ArrowBuffer(const Enumeration& enumeration, bool large_offsets) {
166194
Context ctx = enumeration.context();
167195

168-
const void* data;
169-
uint64_t data_size;
170-
ctx.handle_error(tiledb_enumeration_get_data(ctx.ptr().get(), enumeration.ptr().get(), &data, &data_size));
196+
size_t data_byte_size = 0;
197+
size_t offsets_byte_size = 0;
198+
size_t offsets_byte_offset = 0;
199+
200+
const void* offsets;
201+
uint64_t offsets_size;
171202

172-
data_ = std::make_unique_for_overwrite<std::byte[]>(data_size);
173-
std::memcpy(data_.get(), data, data_size);
203+
const void* data;
204+
ctx.handle_error(tiledb_enumeration_get_data(ctx.ptr().get(), enumeration.ptr().get(), &data, &data_byte_size));
174205

175206
switch (enumeration.type()) {
176207
case TILEDB_CHAR:
@@ -179,74 +210,82 @@ ArrowBuffer::ArrowBuffer(const Enumeration& enumeration, bool large_offsets) {
179210
case TILEDB_BLOB:
180211
case TILEDB_GEOM_WKT:
181212
case TILEDB_GEOM_WKB: {
182-
const void* offsets;
183-
uint64_t offsets_size;
184213
ctx.handle_error(
185214
tiledb_enumeration_get_offsets(ctx.ptr().get(), enumeration.ptr().get(), &offsets, &offsets_size));
215+
186216
size_t count = offsets_size / sizeof(uint64_t);
187217

188218
if (large_offsets) {
189-
large_offsets_ = std::make_unique_for_overwrite<int64_t[]>(count + 1);
190-
std::memcpy(large_offsets_.get(), offsets, offsets_size);
191-
large_offsets_[count] = data_size;
219+
offsets_byte_size = (count + 1) * sizeof(int64_t);
192220
} else {
193-
small_offsets_ = std::make_unique_for_overwrite<int32_t[]>(count + 1);
194-
std::span<const uint64_t> offsets_v(static_cast<const uint64_t*>(offsets), count);
195-
for (size_t i = 0; i < count; ++i) {
221+
offsets_byte_size = (count + 1) * sizeof(int32_t);
222+
}
223+
224+
offsets_byte_offset = ((data_byte_size + 7) / 8) * 8;
225+
} break;
226+
case TILEDB_BOOL:
227+
offsets_byte_offset = 1;
228+
break;
229+
default:
230+
offsets_byte_offset = data_byte_size;
231+
}
232+
233+
global_buffer_ = std::make_unique_for_overwrite<std::byte[]>(offsets_byte_offset + offsets_byte_size);
234+
235+
data_ = std::span<std::byte>(global_buffer_.get(), data_byte_size);
236+
std::memcpy(data_.data(), data, data_byte_size);
237+
238+
switch (enumeration.type()) {
239+
case TILEDB_CHAR:
240+
case TILEDB_STRING_ASCII:
241+
case TILEDB_STRING_UTF8:
242+
case TILEDB_BLOB:
243+
case TILEDB_GEOM_WKT:
244+
case TILEDB_GEOM_WKB: {
245+
length = offsets_size / sizeof(uint64_t);
246+
247+
if (large_offsets) {
248+
large_offsets_ = std::span<int64_t>(
249+
reinterpret_cast<int64_t*>(global_buffer_.get() + offsets_byte_offset), length + 1);
250+
std::memcpy(large_offsets_.data(), offsets, offsets_size);
251+
large_offsets_[length] = data_byte_size;
252+
} else {
253+
small_offsets_ = std::span<int32_t>(
254+
reinterpret_cast<int32_t*>(global_buffer_.get() + offsets_byte_offset), length + 1);
255+
std::span<const uint64_t> offsets_v(static_cast<const uint64_t*>(offsets), length);
256+
for (size_t i = 0; i < length; ++i) {
196257
small_offsets_[i] = static_cast<int32_t>(offsets_v[i]);
197258
}
198-
small_offsets_[count] = static_cast<int32_t>(data_size);
259+
small_offsets_[length] = static_cast<int32_t>(data_byte_size);
199260
}
200-
201-
length = count;
202261
} break;
203262
case TILEDB_BOOL: {
204-
data_ = std::make_unique_for_overwrite<std::byte[]>(1);
205-
std::span<const bool> data_v(static_cast<const bool*>(data), data_size);
206-
size_t count = data_size / sizeof(bool);
263+
std::span<const bool> data_v(static_cast<const bool*>(data), data_byte_size);
264+
length = data_byte_size / sizeof(bool);
207265

208266
// Represent the Boolean vector with, at most, the last two
209267
// bits. In Arrow, Boolean values are LSB packed
210268
uint8_t packed_data = 0;
211-
for (size_t i = 0; i < count; ++i)
269+
for (size_t i = 0; i < length; ++i)
212270
packed_data |= (data_v[i] << i);
213271

214-
std::memcpy(data_.get(), &packed_data, 1);
215-
length = count;
272+
std::memcpy(data_.data(), &packed_data, 1);
216273
} break;
217274
case TILEDB_INT8:
218-
length = data_size / sizeof(int8_t);
219-
break;
220275
case TILEDB_UINT8:
221-
length = data_size / sizeof(uint8_t);
222-
break;
223276
case TILEDB_INT16:
224-
length = data_size / sizeof(int16_t);
225-
break;
226277
case TILEDB_UINT16:
227-
length = data_size / sizeof(uint16_t);
228-
break;
229278
case TILEDB_INT32:
230-
length = data_size / sizeof(int32_t);
231-
break;
232279
case TILEDB_UINT32:
233-
length = data_size / sizeof(uint32_t);
234-
break;
235280
case TILEDB_DATETIME_SEC:
236281
case TILEDB_DATETIME_MS:
237282
case TILEDB_DATETIME_US:
238283
case TILEDB_DATETIME_NS:
239284
case TILEDB_INT64:
240-
length = data_size / sizeof(int64_t);
241-
break;
242285
case TILEDB_UINT64:
243-
length = data_size / sizeof(uint64_t);
244-
break;
245286
case TILEDB_FLOAT32:
246-
length = data_size / sizeof(float_t);
247-
break;
248287
case TILEDB_FLOAT64:
249-
length = data_size / sizeof(double_t);
288+
length = data_byte_size / impl::type_size(enumeration.type());
250289
break;
251290
default:
252291
throw TileDBSOMAError(
@@ -750,17 +789,6 @@ std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>> Arrow
750789
exitIfError(ArrowArrayAllocateChildren(arr, 0), "Bad array children alloc");
751790
array->length = column->size();
752791

753-
if (column->is_nullable()) {
754-
schema->flags |= ARROW_FLAG_NULLABLE;
755-
756-
// Count nulls
757-
for (size_t i = 0; i < column->size(); ++i) {
758-
array->null_count += column->validity()[i] == 0;
759-
}
760-
} else {
761-
schema->flags &= ~ARROW_FLAG_NULLABLE;
762-
}
763-
764792
// Create an ArrowBuffer to manage the lifetime of `column`.
765793
// - `arrow_buffer` holds shared_ptr to `column`, increments
766794
// the use count and keeps the ColumnBuffer data alive.
@@ -771,6 +799,19 @@ std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>> Arrow
771799
// 0, the ColumnBuffer data will be deleted.
772800
auto arrow_buffer = new PrivateArrowBuffer(std::make_shared<ArrowBuffer>(column.get()));
773801

802+
if (column->is_nullable()) {
803+
schema->flags |= ARROW_FLAG_NULLABLE;
804+
805+
array->null_count = ArrowBitCountSet(arrow_buffer->buffer_->validity_.data(), 0, column->size());
806+
807+
// Count nulls
808+
// for (size_t i = 0; i < column->size(); ++i) {
809+
// array->null_count += column->validity()[i] == 0;
810+
// }
811+
} else {
812+
schema->flags &= ~ARROW_FLAG_NULLABLE;
813+
}
814+
774815
LOG_TRACE(
775816
fmt::format(
776817
"[ArrowAdapter] column type {} name {} nbuf {} {} nullable {}",
@@ -802,13 +843,13 @@ std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>> Arrow
802843
array->buffers = (const void**)malloc(sizeof(void*) * n_buffers);
803844
assert(array->buffers != nullptr);
804845
array->buffers[0] = nullptr; // validity addressed below
805-
array->buffers[n_buffers - 1] = arrow_buffer->buffer_->data_.get();
846+
array->buffers[n_buffers - 1] = arrow_buffer->buffer_->data_.data();
806847
if (n_buffers == 3) {
807-
array->buffers[1] = arrow_buffer->buffer_->large_offsets_.get();
848+
array->buffers[1] = arrow_buffer->buffer_->large_offsets_.data();
808849
}
809850

810851
if (column->is_nullable()) {
811-
array->buffers[0] = arrow_buffer->buffer_->validity_.get();
852+
array->buffers[0] = arrow_buffer->buffer_->validity_.data();
812853
}
813854

814855
if (column->is_ordered()) {
@@ -854,12 +895,12 @@ std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>> Arrow
854895
assert(dict_arr->buffers != nullptr);
855896

856897
dict_arr->buffers[0] = nullptr;
857-
dict_arr->buffers[dict_arr->n_buffers - 1] = enmr_buffer->buffer_->data_.get();
898+
dict_arr->buffers[dict_arr->n_buffers - 1] = enmr_buffer->buffer_->data_.data();
858899
if (is_var_enum) {
859900
if (downcast_dict_of_large_var) {
860-
dict_arr->buffers[1] = enmr_buffer->buffer_->small_offsets_.get();
901+
dict_arr->buffers[1] = enmr_buffer->buffer_->small_offsets_.data();
861902
} else {
862-
dict_arr->buffers[1] = enmr_buffer->buffer_->large_offsets_.get();
903+
dict_arr->buffers[1] = enmr_buffer->buffer_->large_offsets_.data();
863904
}
864905
}
865906

libtiledbsoma/src/utils/arrow_adapter.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <any>
1818
#include <concepts>
19+
#include <span>
1920

2021
#include <tiledb/tiledb>
2122
#include <tiledb/tiledb_experimental>
@@ -48,10 +49,11 @@ struct ArrowBuffer {
4849
ArrowBuffer(ReadColumnBuffer* buffer, bool large_offsets = true);
4950
ArrowBuffer(const Enumeration& enumeration, bool large_offsets = true);
5051

51-
std::unique_ptr<std::byte[]> data_;
52-
std::unique_ptr<int64_t[]> large_offsets_;
53-
std::unique_ptr<int32_t[]> small_offsets_;
54-
std::unique_ptr<std::byte[]> validity_;
52+
std::unique_ptr<std::byte[]> global_buffer_;
53+
std::span<std::byte> data_;
54+
std::span<int64_t> large_offsets_;
55+
std::span<int32_t> small_offsets_;
56+
std::span<uint8_t> validity_;
5557

5658
size_t length;
5759
std::string name;

0 commit comments

Comments
 (0)