Skip to content

Commit ed7836c

Browse files
[c++] Implement buffer resize and resubmission for read queries (#4375)
* Implement buffer resize for incomplete queries * Add tests for buffer resizing * Refactor different allocation schemes into strategies * Switch to uint64 * Remove copy constructor * Switch allocation strategy to shared ptr, add default size for memory pool * Make ArrayBuffers non-copyable * Change max_num_cells computation * Fix copy range for validity buffer * Set correct buffer for validity * Add docstrings * Fix rebasing conflicts * Fix type mismatch for macOS
1 parent 68cf78d commit ed7836c

File tree

10 files changed

+443
-126
lines changed

10 files changed

+443
-126
lines changed

libtiledbsoma/src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ add_library(TILEDB_SOMA_OBJECTS OBJECT
8989
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_transformers.cc
9090
${CMAKE_CURRENT_SOURCE_DIR}/soma/array_buffers.cc
9191
${CMAKE_CURRENT_SOURCE_DIR}/soma/column_buffer.cc
92+
${CMAKE_CURRENT_SOURCE_DIR}/soma/column_buffer_strategies.cc
9293
${CMAKE_CURRENT_SOURCE_DIR}/tiledb_adapter/value_filter.cc
9394
${CMAKE_CURRENT_SOURCE_DIR}/tiledb_adapter/platform_config.cc
9495
${CMAKE_CURRENT_SOURCE_DIR}/utils/arrow_adapter.cc
@@ -251,6 +252,7 @@ install(FILES
251252
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_coordinates.h
252253
${CMAKE_CURRENT_SOURCE_DIR}/soma/array_buffers.h
253254
${CMAKE_CURRENT_SOURCE_DIR}/soma/column_buffer.h
255+
${CMAKE_CURRENT_SOURCE_DIR}/soma/column_buffer_strategies.h
254256
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_array.h
255257
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_group.h
256258
${CMAKE_CURRENT_SOURCE_DIR}/soma/soma_column.h

libtiledbsoma/src/soma/array_buffers.cc

Lines changed: 34 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -29,143 +29,63 @@ bool ArrayBuffers::use_memory_pool(const std::shared_ptr<tiledb::Array>& array)
2929
return use_memory_pool;
3030
}
3131

32-
ArrayBuffers::ArrayBuffers(const std::vector<std::string>& names, const tiledb::Array& array) {
33-
size_t memory_budget = DEFAULT_ALLOC_BYTES;
34-
auto config = array.config();
35-
if (config.contains(CONFIG_KEY_MEMORY_BUDGET)) {
36-
auto value_str = config.get(CONFIG_KEY_MEMORY_BUDGET);
37-
try {
38-
memory_budget = std::stoull(value_str);
39-
} catch (const std::exception& e) {
40-
throw TileDBSOMAError(
41-
fmt::format(
42-
"[ArrayBuffers] Error parsing {}: '{}' ({})", CONFIG_KEY_MEMORY_BUDGET, value_str, e.what()));
43-
}
44-
}
45-
46-
size_t factor = 1;
47-
if (config.contains(CONFIG_KEY_VAR_SIZED_FACTOR)) {
48-
auto value_str = config.get(CONFIG_KEY_VAR_SIZED_FACTOR);
49-
try {
50-
factor = std::stoull(value_str);
51-
} catch (const std::exception& e) {
52-
throw TileDBSOMAError(
53-
fmt::format(
54-
"[ArrayBuffers] Error parsing {}: '{}' ({})", CONFIG_KEY_VAR_SIZED_FACTOR, value_str, e.what()));
55-
}
32+
ArrayBuffers::ArrayBuffers(
33+
const std::vector<std::string>& names,
34+
const tiledb::Array& array,
35+
std::unique_ptr<ColumnBufferAllocationStrategy> strategy)
36+
: names_(names)
37+
, strategy_(std::move(strategy)) {
38+
if (!strategy_) {
39+
strategy_ = std::make_unique<BasicAllocationStrategy>(array);
5640
}
5741

58-
MemoryMode mode = ColumnBuffer::memory_mode(config);
59-
60-
ArraySchema schema = array.schema();
42+
MemoryMode mode = ColumnBuffer::memory_mode(array.config());
43+
const tiledb::ArraySchema schema = array.schema();
44+
const tiledb::Context& context = array.context();
6145
// Split memory budget to each column depending on the byte size of each columns element
6246
// Var sized columns will be allocated the same as an 8 byte datatype
6347

64-
// Ensure minimum buffer size is multiple of 8
65-
size_t memory_budget_unit = (memory_budget /
66-
std::transform_reduce(
67-
names.begin(),
68-
names.end(),
69-
0L,
70-
std::plus{},
71-
[&](auto name) {
72-
size_t weight = 0;
73-
74-
// Check if column is a TileDB attribute
75-
if (schema.has_attribute(name)) {
76-
Attribute attr = schema.attribute(name);
77-
78-
if (!attr.variable_sized() && attr.cell_val_num() != 1) {
79-
throw TileDBSOMAError(
80-
"[ArrayBuffers] Values per cell > 1 is not supported: " + name);
81-
}
82-
83-
weight += attr.nullable() ? 1 : 0;
84-
// If column has variable size add the offset array in the column budget
85-
weight += attr.variable_sized() ? sizeof(uint64_t) * (1 + 2 * factor) :
86-
tiledb::impl::type_size(attr.type());
87-
88-
return weight;
89-
}
90-
// Else check if column is a TileDB dimension
91-
else if (schema.domain().has_dimension(name)) {
92-
Dimension dim = schema.domain().dimension(name);
93-
94-
bool is_var = dim.cell_val_num() == TILEDB_VAR_NUM ||
95-
dim.type() == TILEDB_STRING_ASCII ||
96-
dim.type() == TILEDB_STRING_UTF8;
97-
98-
if (!is_var && dim.cell_val_num() != 1) {
99-
throw TileDBSOMAError(
100-
"[ArrayBuffers] Values per cell > 1 is not supported: " + name);
101-
}
102-
103-
weight += (dim.type() == TILEDB_STRING_ASCII ||
104-
dim.type() == TILEDB_STRING_UTF8) ?
105-
sizeof(uint64_t) * (1 + 2 * factor) :
106-
tiledb::impl::type_size(dim.type());
107-
108-
return weight;
109-
}
110-
111-
throw TileDBSOMAError(
112-
fmt::format("[ArrayBuffers] Missing column name '{}'", name));
113-
}) /
114-
8) *
115-
8;
116-
117-
for (const auto& name : names) {
118-
names_.push_back(name);
119-
48+
for (const auto& name : names_) {
12049
if (schema.has_attribute(name)) {
121-
Attribute attr = schema.attribute(name);
50+
tiledb::Attribute attribute = schema.attribute(name);
12251

123-
size_t column_budget = (attr.variable_sized() ? sizeof(uint64_t) * 2 * factor :
124-
tiledb::impl::type_size(attr.type())) *
125-
memory_budget_unit;
126-
127-
size_t num_cells = memory_budget_unit;
128-
129-
auto enum_name = AttributeExperimental::get_enumeration_name(schema.context(), attr);
52+
auto [column_budget, num_cells] = strategy_->get_buffer_sizes(attribute);
53+
auto enum_name = AttributeExperimental::get_enumeration_name(context, attribute);
13054
std::optional<Enumeration> enumeration = std::nullopt;
13155
bool is_ordered = false;
13256
if (enum_name.has_value()) {
133-
auto enmr = ArrayExperimental::get_enumeration(schema.context(), array, *enum_name);
134-
is_ordered = enmr.ordered();
135-
enumeration = std::make_optional<Enumeration>(enmr);
57+
enumeration = std::make_optional<Enumeration>(
58+
ArrayExperimental::get_enumeration(context, array, *enum_name));
59+
is_ordered = enumeration->ordered();
13660
}
13761

13862
buffers_.insert(
13963
std::make_pair(
14064
name,
14165
std::make_shared<CArrayColumnBuffer>(
14266
name,
143-
attr.type(),
67+
attribute.type(),
14468
num_cells,
14569
column_budget,
146-
attr.variable_sized(),
147-
attr.nullable(),
70+
attribute.variable_sized(),
71+
attribute.nullable(),
14872
enumeration,
14973
is_ordered,
15074
mode)));
15175
}
15276
// Else check if column is a TileDB dimension
15377
else if (schema.domain().has_dimension(name)) {
154-
Dimension dim = schema.domain().dimension(name);
155-
156-
bool is_var = dim.cell_val_num() == TILEDB_VAR_NUM || dim.type() == TILEDB_STRING_ASCII ||
157-
dim.type() == TILEDB_STRING_UTF8;
78+
tiledb::Dimension dimension = schema.domain().dimension(name);
15879

159-
// Ensure buffer size is multiple of 8
160-
size_t column_budget = (is_var ? sizeof(uint64_t) * 2 * factor : tiledb::impl::type_size(dim.type())) *
161-
memory_budget_unit;
162-
size_t num_cells = memory_budget_unit;
80+
auto [column_budget, num_cells] = strategy_->get_buffer_sizes(dimension);
81+
bool is_var = dimension.cell_val_num() == TILEDB_VAR_NUM || dimension.type() == TILEDB_STRING_ASCII ||
82+
dimension.type() == TILEDB_STRING_UTF8;
16383

16484
buffers_.insert(
16585
std::make_pair(
16686
name,
16787
std::make_shared<CArrayColumnBuffer>(
168-
name, dim.type(), num_cells, column_budget, is_var, false, std::nullopt, false, mode)));
88+
name, dimension.type(), num_cells, column_budget, is_var, false, std::nullopt, false, mode)));
16989
}
17090
}
17191
}
@@ -178,4 +98,12 @@ void ArrayBuffers::emplace(const std::string& name, std::shared_ptr<ColumnBuffer
17898
buffers_.emplace(name, buffer);
17999
}
180100

101+
void ArrayBuffers::expand_buffers() {
102+
for (const auto& name : names_) {
103+
std::shared_ptr<ReadColumnBuffer> buffer = at<ReadColumnBuffer>(name);
104+
buffer->resize(
105+
buffer->max_size() * DEFAULT_BUFFER_EXPANSION_FACTOR,
106+
buffer->max_num_cells() * DEFAULT_BUFFER_EXPANSION_FACTOR);
107+
}
108+
}
181109
} // namespace tiledbsoma

libtiledbsoma/src/soma/array_buffers.h

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,35 @@
1515
#define ARRAY_BUFFERS_H
1616

1717
#include <concepts>
18+
#include <functional>
1819
#include <stdexcept> // for windows: error C2039: 'runtime_error': is not a member of 'std'
19-
2020
#include <tiledb/tiledb>
2121

2222
#include "../utils/common.h"
2323
#include "column_buffer.h"
24+
#include "column_buffer_strategies.h"
2425

2526
namespace tiledbsoma {
2627

2728
using namespace tiledb;
2829

2930
class ArrayBuffers {
30-
inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 28;
31+
inline static const size_t DEFAULT_BUFFER_EXPANSION_FACTOR = 2;
3132
inline static const std::string CONFIG_KEY_USE_MEMORY_POOL = "soma.read.use_memory_pool";
32-
inline static const std::string CONFIG_KEY_MEMORY_BUDGET = "soma.read.memory_budget";
33-
inline static const std::string CONFIG_KEY_VAR_SIZED_FACTOR = "soma.read.var_size_factor";
3433

3534
public:
3635
ArrayBuffers() = default;
37-
ArrayBuffers(const std::vector<std::string>& names, const tiledb::Array& array);
38-
ArrayBuffers(const ArrayBuffers&) = default;
36+
ArrayBuffers(
37+
const std::vector<std::string>& names,
38+
const tiledb::Array& array,
39+
std::unique_ptr<ColumnBufferAllocationStrategy> strategy = nullptr);
40+
41+
ArrayBuffers(const ArrayBuffers&) = delete;
3942
ArrayBuffers(ArrayBuffers&&) = default;
4043
~ArrayBuffers() = default;
4144

45+
ArrayBuffers& operator=(const ArrayBuffers&) = delete;
46+
4247
/**
4348
* @brief Return the buffer with the given name.
4449
*
@@ -105,12 +110,22 @@ class ArrayBuffers {
105110
*/
106111
static bool use_memory_pool(const std::shared_ptr<tiledb::Array>& array);
107112

113+
/**
114+
* @brief Double the size of the allocated buffers. Any data already in the buffers
115+
* will be deleted. By default this function will allocate more memory than the
116+
* memory budget set by the user.
117+
*/
118+
void expand_buffers();
119+
108120
private:
109121
// A vector of column names that maintains the order the columns were added
110122
std::vector<std::string> names_;
111123

112124
// Map: column name -> ColumnBuffer
113125
std::unordered_map<std::string, std::shared_ptr<ColumnBuffer>> buffers_;
126+
127+
// The allocation strategy used to split the available memory budget to the different columns.
128+
std::unique_ptr<ColumnBufferAllocationStrategy> strategy_;
114129
};
115130

116131
} // namespace tiledbsoma

libtiledbsoma/src/soma/column_buffer.cc

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ std::unique_ptr<IArrowBufferStorage> ColumnBuffer::export_buffers() {
213213
}
214214
}
215215

216+
uint64_t ColumnBuffer::max_size() const {
217+
return max_data_size_;
218+
}
219+
220+
uint64_t ColumnBuffer::max_num_cells() const {
221+
return max_num_cells_;
222+
}
223+
216224
#pragma endregion
217225

218226
#pragma region private non-static
@@ -457,6 +465,45 @@ std::unique_ptr<IArrowBufferStorage> CArrayColumnBuffer::export_buffers() {
457465
}
458466
}
459467

468+
void CArrayColumnBuffer::resize(const uint64_t num_bytes, const uint64_t num_cells, const bool preserve_data) {
469+
std::unique_ptr<std::byte[]> data_buffer = std::make_unique_for_overwrite<std::byte[]>(num_bytes);
470+
std::unique_ptr<uint64_t[]> offsets_buffer;
471+
std::unique_ptr<uint8_t[]> validity_buffer;
472+
473+
if (is_var()) {
474+
offsets_buffer = std::make_unique_for_overwrite<uint64_t[]>(num_cells + 1);
475+
}
476+
477+
if (is_nullable()) {
478+
validity_buffer = std::make_unique_for_overwrite<uint8_t[]>(num_cells);
479+
}
480+
481+
if (preserve_data) {
482+
std::memcpy(data_buffer.get(), data_.get(), std::min(num_bytes, data_size_));
483+
484+
if (is_var()) {
485+
std::memcpy(
486+
offsets_buffer.get(), offsets_.get(), std::min(num_cells + 1, num_cells_ + 1) * sizeof(uint64_t));
487+
}
488+
489+
if (is_nullable()) {
490+
std::memcpy(validity_buffer.get(), validity_.get(), std::min(num_cells, num_cells_) * sizeof(uint8_t));
491+
}
492+
493+
data_size_ = std::min(num_bytes, data_size_);
494+
num_cells_ = std::min(num_cells, num_cells_);
495+
}
496+
497+
max_data_size_ = num_bytes;
498+
max_num_cells_ = num_cells;
499+
500+
data_ = std::move(data_buffer);
501+
offsets_ = std::move(offsets_buffer);
502+
validity_ = std::move(validity_buffer);
503+
504+
num_cells_ = std::min(num_cells_, num_cells);
505+
}
506+
460507
#pragma endregion
461508

462509
#pragma endregion
@@ -727,4 +774,42 @@ std::shared_ptr<ColumnBuffer> VectorColumnBuffer::alloc(
727774
name, type, num_cells, num_bytes, is_var, is_nullable, enumeration, is_ordered, mode);
728775
}
729776

777+
void VectorColumnBuffer::resize(const uint64_t num_bytes, const uint64_t num_cells, const bool preserve_data) {
778+
std::vector<std::byte, NoInitAlloc<std::byte>> data_buffer(num_bytes);
779+
std::vector<uint64_t, NoInitAlloc<uint64_t>> offsets_buffer;
780+
std::vector<uint8_t, NoInitAlloc<uint8_t>> validity_buffer;
781+
782+
if (is_var()) {
783+
offsets_buffer.resize(num_cells + 1);
784+
}
785+
786+
if (is_nullable()) {
787+
validity_buffer.resize(num_cells);
788+
}
789+
790+
if (preserve_data) {
791+
std::memcpy(data_buffer.data(), data_.data(), std::min(num_bytes, data_size_));
792+
793+
if (is_var()) {
794+
std::memcpy(
795+
offsets_buffer.data(), offsets_.data(), std::min(num_cells + 1, num_cells_ + 1) * sizeof(uint64_t));
796+
}
797+
798+
if (is_nullable()) {
799+
std::memcpy(validity_buffer.data(), validity_.data(), std::min(num_cells, num_cells_) * sizeof(uint8_t));
800+
}
801+
802+
data_size_ = std::min(num_bytes, data_size_);
803+
num_cells_ = std::min(num_cells, num_cells_);
804+
}
805+
806+
max_data_size_ = num_bytes;
807+
max_num_cells_ = num_cells;
808+
809+
data_ = data_buffer;
810+
offsets_ = offsets_buffer;
811+
validity_ = validity_buffer;
812+
813+
num_cells_ = std::min(num_cells_, num_cells);
814+
}
730815
} // namespace tiledbsoma

0 commit comments

Comments
 (0)