Skip to content

Commit 6aa599f

Browse files
Refactor fragment consolidation with class FragmentConsolidationWorkspace (#4605)
This PR puts the query buffers used inside the fragment consolidator into their own class. This class deletes its copy constructor but provides a move constructor. This avoids copying the buffers in the `return` statement of `create_buffers`, which not only doubles the instantaneous memory consumption during that statement, but does a bunch of spurious copying of empty buffers. In addition, this change prepares for memory as a managed resource, providing a single location for the allocators of the containers for these buffers to be specified. --- TYPE: NO_HISTORY DESC: `class FragmentConsolidationWorkspace`
1 parent 31500c5 commit 6aa599f

File tree

4 files changed

+168
-125
lines changed

4 files changed

+168
-125
lines changed

tiledb/sm/consolidator/consolidator.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,18 @@ class Consolidator {
125125
*/
126126
virtual void vacuum(const char* array_name);
127127

128+
/* ********************************* */
129+
/* TYPE DEFINITIONS */
130+
/* ********************************* */
131+
132+
/** Consolidation configuration parameters. */
133+
struct ConsolidationConfigBase {
134+
/** Start time for consolidation. */
135+
uint64_t timestamp_start_;
136+
/** End time for consolidation. */
137+
uint64_t timestamp_end_;
138+
};
139+
128140
protected:
129141
/* ********************************* */
130142
/* PROTECTED METHODS */
@@ -144,18 +156,6 @@ class Consolidator {
144156
*/
145157
void check_array_uri(const char* array_name);
146158

147-
/* ********************************* */
148-
/* TYPE DEFINITIONS */
149-
/* ********************************* */
150-
151-
/** Consolidation configuration parameters. */
152-
struct ConsolidationConfigBase {
153-
/** Start time for consolidation. */
154-
uint64_t timestamp_start_;
155-
/** End time for consolidation. */
156-
uint64_t timestamp_end_;
157-
};
158-
159159
/* ********************************* */
160160
/* PROTECTED ATTRIBUTES */
161161
/* ********************************* */

tiledb/sm/consolidator/fragment_consolidator.cc

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ using namespace tiledb::common;
5151

5252
namespace tiledb::sm {
5353

54-
class FragmentConsolidatorStatusException : public StatusException {
54+
class FragmentConsolidatorException : public StatusException {
5555
public:
56-
explicit FragmentConsolidatorStatusException(const std::string& message)
56+
explicit FragmentConsolidatorException(const std::string& message)
5757
: StatusException("FragmentConsolidator", message) {
5858
}
5959
};
@@ -293,7 +293,7 @@ Status FragmentConsolidator::consolidate_fragments(
293293

294294
void FragmentConsolidator::vacuum(const char* array_name) {
295295
if (array_name == nullptr) {
296-
throw Status_StorageManagerError(
296+
throw FragmentConsolidatorException(
297297
"Cannot vacuum fragments; Array name cannot be null");
298298
}
299299

@@ -348,7 +348,7 @@ bool FragmentConsolidator::are_consolidatable(
348348
size_t start,
349349
size_t end,
350350
const NDRange& union_non_empty_domains) const {
351-
auto anterior_ndrange = fragment_info.anterior_ndrange();
351+
const auto& anterior_ndrange = fragment_info.anterior_ndrange();
352352
if (anterior_ndrange.size() != 0 &&
353353
domain.overlap(union_non_empty_domains, anterior_ndrange))
354354
return false;
@@ -419,8 +419,8 @@ Status FragmentConsolidator::consolidate_internal(
419419

420420
// Prepare buffers
421421
auto average_var_cell_sizes = array_for_reads->get_average_var_cell_sizes();
422-
auto&& [buffers, buffer_sizes] =
423-
create_buffers(stats_, config_, array_schema, average_var_cell_sizes);
422+
FragmentConsolidationWorkspace cw{
423+
create_buffers(stats_, config_, array_schema, average_var_cell_sizes)};
424424

425425
// Create queries
426426
auto query_r = (Query*)nullptr;
@@ -451,12 +451,13 @@ Status FragmentConsolidator::consolidate_internal(
451451
}
452452

453453
// Read from one array and write to the other
454-
st = copy_array(query_r, query_w, &buffers, &buffer_sizes);
455-
if (!st.ok()) {
454+
try {
455+
copy_array(query_r, query_w, cw);
456+
} catch (...) {
456457
tdb_delete(query_r);
457458
tdb_delete(query_w);
458-
return st;
459-
}
459+
throw;
460+
};
460461

461462
// Finalize write query
462463
st = query_w->finalize();
@@ -495,47 +496,41 @@ Status FragmentConsolidator::consolidate_internal(
495496
return st;
496497
}
497498

498-
Status FragmentConsolidator::copy_array(
499-
Query* query_r,
500-
Query* query_w,
501-
std::vector<ByteVec>* buffers,
502-
std::vector<uint64_t>* buffer_sizes) {
499+
void FragmentConsolidator::copy_array(
500+
Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw) {
503501
auto timer_se = stats_->start_timer("consolidate_copy_array");
504502

505503
// Set the read query buffers outside the repeated submissions.
506504
// The Reader will reset the query buffer sizes to the original
507505
// sizes, not the potentially smaller sizes of the results after
508506
// the query submission.
509-
RETURN_NOT_OK(set_query_buffers(query_r, buffers, buffer_sizes));
507+
set_query_buffers(query_r, cw);
510508

511509
do {
512510
// READ
513-
RETURN_NOT_OK(query_r->submit());
511+
throw_if_not_ok(query_r->submit());
514512

515513
// If Consolidation cannot make any progress, throw. The first buffer will
516-
// always contain fixed size data, wether it is tile offsets for var size
514+
// always contain fixed size data, whether it is tile offsets for var size
517515
// attribute/dimension or the actual fixed size data so we can use its size
518516
// to know if any cells were written or not.
519-
if (buffer_sizes->at(0) == 0) {
520-
throw FragmentConsolidatorStatusException(
517+
if (cw.sizes().at(0) == 0) {
518+
throw FragmentConsolidatorException(
521519
"Consolidation read 0 cells, no progress can be made");
522520
}
523521

524522
// Set explicitly the write query buffers, as the sizes may have
525523
// been altered by the read query.
526-
RETURN_NOT_OK(set_query_buffers(query_w, buffers, buffer_sizes));
524+
set_query_buffers(query_w, cw);
527525

528526
// WRITE
529-
RETURN_NOT_OK(query_w->submit());
527+
throw_if_not_ok(query_w->submit());
530528
} while (query_r->status() == QueryStatus::INCOMPLETE);
531-
532-
return Status::Ok();
533529
}
534530

535-
tuple<std::vector<ByteVec>, std::vector<uint64_t>>
536-
FragmentConsolidator::create_buffers(
531+
FragmentConsolidationWorkspace FragmentConsolidator::create_buffers(
537532
stats::Stats* stats,
538-
const ConsolidationConfig& config,
533+
const FragmentConsolidationConfig& config,
539534
const ArraySchema& array_schema,
540535
std::unordered_map<std::string, uint64_t>& avg_cell_sizes) {
541536
auto timer_se = stats->start_timer("consolidate_create_buffers");
@@ -610,10 +605,11 @@ FragmentConsolidator::create_buffers(
610605
}
611606

612607
// Create buffers.
613-
std::vector<ByteVec> buffers(buffer_num);
614-
std::vector<uint64_t> buffer_sizes(buffer_num);
615-
auto total_weights =
616-
std::accumulate(buffer_weights.begin(), buffer_weights.end(), 0);
608+
FragmentConsolidationWorkspace cw{buffer_num};
609+
std::vector<ByteVec>& buffers{cw.buffers()};
610+
std::vector<uint64_t>& buffer_sizes{cw.sizes()};
611+
auto total_weights = std::accumulate(
612+
buffer_weights.begin(), buffer_weights.end(), static_cast<size_t>(0));
617613

618614
// Allocate space for each buffer.
619615
uint64_t adjusted_budget = total_budget / total_weights * total_weights;
@@ -624,7 +620,7 @@ FragmentConsolidator::create_buffers(
624620
}
625621

626622
// Success
627-
return {buffers, buffer_sizes};
623+
return cw;
628624
}
629625

630626
Status FragmentConsolidator::create_queries(
@@ -810,10 +806,11 @@ Status FragmentConsolidator::compute_next_to_consolidate(
810806
return Status::Ok();
811807
}
812808

813-
Status FragmentConsolidator::set_query_buffers(
814-
Query* query,
815-
std::vector<ByteVec>* buffers,
816-
std::vector<uint64_t>* buffer_sizes) const {
809+
void FragmentConsolidator::set_query_buffers(
810+
Query* query, FragmentConsolidationWorkspace& cw) const {
811+
std::vector<ByteVec>* buffers{&cw.buffers()};
812+
std::vector<uint64_t>* buffer_sizes{&cw.sizes()};
813+
817814
const auto& array_schema = query->array_schema();
818815
auto dim_num = array_schema.dim_num();
819816
auto dense = array_schema.dense();
@@ -892,8 +889,6 @@ Status FragmentConsolidator::set_query_buffers(
892889
&(*buffer_sizes)[bid]));
893890
++bid;
894891
}
895-
896-
return Status::Ok();
897892
}
898893

899894
Status FragmentConsolidator::set_config(const Config& config) {

0 commit comments

Comments
 (0)