Skip to content

Commit f2da391

Browse files
Add a multithreaded alternative to to_arrow
1 parent 2c6aea3 commit f2da391

File tree

3 files changed

+41
-3
lines changed

3 files changed

+41
-3
lines changed

apis/python/src/tiledbsoma/common.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,18 @@ py::object _buffer_to_table(std::shared_ptr<ArrayBuffers> buffers) {
195195
py::list array_list;
196196
py::list field_list;
197197

198-
for (auto& name : buffers->names()) {
199-
auto [pa_array, pa_schema] = ArrowAdapter::to_arrow(buffers->at<ReadColumnBuffer>(name));
198+
auto column_names = buffers->names();
199+
200+
auto arrays = ArrowAdapter::buffer_to_arrow(buffers);
201+
202+
for (size_t i = 0; i < column_names.size(); ++i) {
203+
auto& [pa_array, pa_schema] = arrays[i];
200204
auto nullable = (pa_schema->flags & ARROW_FLAG_NULLABLE) != 0;
201205
auto dtype = pa_dtype_import(py::capsule(pa_schema.get()));
202206
array_list.append(pa_array_import(py::capsule(pa_array.get()), dtype));
203-
field_list.append(pa.attr("field")(name, dtype, nullable));
207+
field_list.append(pa.attr("field")(column_names[i], dtype, nullable));
204208
}
209+
205210
return pa_table_from_arrays(array_list, "schema"_a = pa.attr("schema")(field_list));
206211
}
207212

libtiledbsoma/src/utils/arrow_adapter.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
*/
1313

1414
#include <ranges>
15+
#include <thread>
1516

17+
#include "../soma/array_buffers.h"
1618
#include "../soma/column_buffer.h"
1719
#include "arrow_adapter.h"
1820
#include "logger.h"
@@ -756,6 +758,28 @@ inline void exitIfError(const ArrowErrorCode ec, const std::string& msg) {
756758
throw TileDBSOMAError(fmt::format("ArrowAdapter: Arrow Error {} ", msg));
757759
}
758760

761+
std::vector<std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>>> ArrowAdapter::buffer_to_arrow(
762+
std::shared_ptr<ArrayBuffers> buffer, bool downcast_dict_of_large_var) {
763+
std::vector<std::future<std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>>>> arrow_columns;
764+
765+
for (const auto& name : buffer->names()) {
766+
arrow_columns.emplace_back(
767+
std::async(
768+
std::launch::async,
769+
ArrowAdapter::to_arrow,
770+
buffer->at<ReadColumnBuffer>(name),
771+
downcast_dict_of_large_var));
772+
}
773+
774+
std::vector<std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>>> columns;
775+
776+
for (auto& arrow_column : arrow_columns) {
777+
columns.push_back(std::move(arrow_column.get()));
778+
}
779+
780+
return columns;
781+
}
782+
759783
std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>> ArrowAdapter::to_arrow(
760784
std::shared_ptr<ReadColumnBuffer> column, bool downcast_dict_of_large_var) {
761785
managed_unique_ptr<ArrowSchema> schema = make_managed_unique<ArrowSchema>();

libtiledbsoma/src/utils/arrow_adapter.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ using namespace tiledb;
3535
using json = nlohmann::json;
3636

3737
class ReadColumnBuffer;
38+
class ArrayBuffers;
3839
class SOMACoordinateSpace;
3940

4041
/**
@@ -106,6 +107,14 @@ class ArrowAdapter {
106107

107108
static bool _isstr(const char* format);
108109

110+
/**
111+
* @brief Convert ArrayBuffer to an list of Arrow arrays.
112+
*
113+
* @return std::vector<std::pair<std::unique_ptr<ArrowArray>, std::unique_ptr<ArrowSchema>>>
114+
*/
115+
static std::vector<std::pair<managed_unique_ptr<ArrowArray>, managed_unique_ptr<ArrowSchema>>> buffer_to_arrow(
116+
std::shared_ptr<ArrayBuffers> buffers, bool downcast_dict_of_large_var = false);
117+
109118
/**
110119
* @brief Convert ColumnBuffer to an Arrow array.
111120
*

0 commit comments

Comments
 (0)