Skip to content

Commit c3a3e3a

Browse files
committed
Use a concurrent buffer deque in FragmentConsolidation.
1 parent 8ef43c4 commit c3a3e3a

File tree

6 files changed

+149
-107
lines changed

6 files changed

+149
-107
lines changed

test/src/unit-capi-consolidation.cc

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*
66
* The MIT License
77
*
8-
* @copyright Copyright (c) 2017-2021 TileDB Inc.
8+
* @copyright Copyright (c) 2017-2025 TileDB Inc.
99
*
1010
* Permission is hereby granted, free of charge, to any person obtaining a copy
1111
* of this software and associated documentation files (the "Software"), to deal
@@ -7483,29 +7483,6 @@ TEST_CASE_METHOD(
74837483
}
74847484
}
74857485

7486-
TEST_CASE_METHOD(
7487-
ConsolidationFx,
7488-
"C API: Test consolidation, sparse string, no progress",
7489-
"[capi][consolidation][sparse][string][no-progress][non-rest]") {
7490-
remove_sparse_string_array();
7491-
create_sparse_string_array();
7492-
7493-
write_sparse_string_full();
7494-
write_sparse_string_unordered();
7495-
consolidate_sparse_string(1, true);
7496-
7497-
tiledb_error_t* err = NULL;
7498-
tiledb_ctx_get_last_error(ctx_, &err);
7499-
7500-
const char* msg;
7501-
tiledb_error_message(err, &msg);
7502-
CHECK(
7503-
std::string("FragmentConsolidator: Consolidation read 0 cells, no "
7504-
"progress can be made") == msg);
7505-
7506-
remove_sparse_string_array();
7507-
}
7508-
75097486
TEST_CASE_METHOD(
75107487
ConsolidationFx,
75117488
"C API: Test consolidation, fragments/commits out of order",

tiledb/sm/consolidator/fragment_consolidator.cc

Lines changed: 126 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,6 @@ Status FragmentConsolidator::consolidate(
248248
return st;
249249
}
250250

251-
FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);
252-
253251
uint32_t step = 0;
254252
std::vector<TimestampedURI> to_consolidate;
255253
do {
@@ -277,17 +275,17 @@ Status FragmentConsolidator::consolidate(
277275

278276
// Consolidate the selected fragments
279277
URI new_fragment_uri;
280-
st = consolidate_internal(
281-
array_for_reads,
282-
array_for_writes,
283-
to_consolidate,
284-
union_non_empty_domains,
285-
&new_fragment_uri,
286-
cw);
287-
if (!st.ok()) {
278+
try {
279+
consolidate_internal(
280+
array_for_reads,
281+
array_for_writes,
282+
to_consolidate,
283+
union_non_empty_domains,
284+
&new_fragment_uri);
285+
} catch (...) {
288286
throw_if_not_ok(array_for_reads->close());
289287
throw_if_not_ok(array_for_writes->close());
290-
return st;
288+
throw;
291289
}
292290

293291
// Load info of the consolidated fragment and add it
@@ -463,21 +461,19 @@ Status FragmentConsolidator::consolidate_fragments(
463461
}
464462
}
465463

466-
FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);
467-
468464
// Consolidate the selected fragments
469465
URI new_fragment_uri;
470-
st = consolidate_internal(
471-
array_for_reads,
472-
array_for_writes,
473-
to_consolidate,
474-
union_non_empty_domains,
475-
&new_fragment_uri,
476-
cw);
477-
if (!st.ok()) {
466+
try {
467+
consolidate_internal(
468+
array_for_reads,
469+
array_for_writes,
470+
to_consolidate,
471+
union_non_empty_domains,
472+
&new_fragment_uri);
473+
} catch (...) {
478474
throw_if_not_ok(array_for_reads->close());
479475
throw_if_not_ok(array_for_writes->close());
480-
return st;
476+
throw;
481477
}
482478

483479
// Load info of the consolidated fragment and add it
@@ -576,19 +572,18 @@ bool FragmentConsolidator::are_consolidatable(
576572
return (double(union_cell_num) / sum_cell_num) <= config_.amplification_;
577573
}
578574

579-
Status FragmentConsolidator::consolidate_internal(
575+
void FragmentConsolidator::consolidate_internal(
580576
shared_ptr<Array> array_for_reads,
581577
shared_ptr<Array> array_for_writes,
582578
const std::vector<TimestampedURI>& to_consolidate,
583579
const NDRange& union_non_empty_domains,
584-
URI* new_fragment_uri,
585-
FragmentConsolidationWorkspace& cw) {
580+
URI* new_fragment_uri) {
586581
auto timer_se = stats_->start_timer("consolidate_internal");
587582

588583
array_for_reads->load_fragments(to_consolidate);
589584

590585
if (array_for_reads->is_empty()) {
591-
return Status::Ok();
586+
return;
592587
}
593588

594589
// Get schema
@@ -622,20 +617,8 @@ Status FragmentConsolidator::consolidate_internal(
622617
}
623618
}
624619

625-
// Compute memory budgets
626-
uint64_t total_weights =
627-
config_.buffers_weight_ + config_.reader_weight_ + config_.writer_weight_;
628-
uint64_t single_unit_budget = config_.total_budget_ / total_weights;
629-
uint64_t buffers_budget = config_.buffers_weight_ * single_unit_budget;
630-
uint64_t reader_budget = config_.reader_weight_ * single_unit_budget;
631-
uint64_t writer_budget = config_.writer_weight_ * single_unit_budget;
632-
633-
// Prepare buffers
634-
auto average_var_cell_sizes = array_for_reads->get_average_var_cell_sizes();
635-
cw.resize_buffers(
636-
stats_, config_, array_schema, average_var_cell_sizes, buffers_budget);
637-
638620
// Create queries
621+
uint64_t buffer_size = 10485760; // 10 MB
639622
tdb_unique_ptr<Query> query_r = nullptr;
640623
tdb_unique_ptr<Query> query_w = nullptr;
641624
throw_if_not_ok(create_queries(
@@ -645,28 +628,33 @@ Status FragmentConsolidator::consolidate_internal(
645628
query_r,
646629
query_w,
647630
new_fragment_uri,
648-
reader_budget,
649-
writer_budget));
631+
buffer_size,
632+
buffer_size));
650633

651634
// Get the vacuum URI
652635
URI vac_uri;
653636
try {
654637
vac_uri =
655638
array_for_reads->array_directory().get_vacuum_uri(*new_fragment_uri);
656639
} catch (std::exception& e) {
657-
FragmentConsolidatorException(
640+
throw FragmentConsolidatorException(
658641
"Internal consolidation failed with exception" + std::string(e.what()));
659642
}
660643

661644
// Read from one array and write to the other
662-
copy_array(query_r.get(), query_w.get(), cw);
645+
copy_array(
646+
query_r.get(),
647+
query_w.get(),
648+
array_schema,
649+
array_for_reads->get_average_var_cell_sizes(),
650+
buffer_size);
663651

664652
// Finalize write query
665653
auto st = query_w->finalize();
666654
if (!st.ok()) {
667655
if (resources_.vfs().is_dir(*new_fragment_uri))
668656
resources_.vfs().remove_dir(*new_fragment_uri);
669-
return st;
657+
throw FragmentConsolidatorException(st.message());
670658
}
671659

672660
// Write vacuum file
@@ -678,42 +666,110 @@ Status FragmentConsolidator::consolidate_internal(
678666
if (!st.ok()) {
679667
if (resources_.vfs().is_dir(*new_fragment_uri))
680668
resources_.vfs().remove_dir(*new_fragment_uri);
681-
return st;
669+
throw FragmentConsolidatorException(st.message());
682670
}
683-
684-
return st;
685671
}
686672

687673
void FragmentConsolidator::copy_array(
688-
Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw) {
689-
auto timer_se = stats_->start_timer("consolidate_copy_array");
674+
Query* query_r,
675+
Query* query_w,
676+
const ArraySchema& reader_array_schema_latest,
677+
std::unordered_map<std::string, uint64_t> average_var_cell_sizes,
678+
uint64_t buffer_size) {
679+
// The maximum number of buffers the reader may allocate.
680+
uint64_t max_buffer_count = 10;
681+
682+
// Deque which stores the buffers passed between the reader and writer. Cannot
683+
// exceed size `max_buffer_count`.
684+
ProducerConsumerQueue<std::variant<
685+
tdb_shared_ptr<FragmentConsolidationWorkspace>,
686+
std::exception_ptr>>
687+
buffer_queue;
688+
689+
// Atomic counter of the queue buffers allocated by the reader.
690+
// May not exceed `max_buffer_count`.
691+
std::atomic<uint64_t> buffer_count = 0;
692+
693+
// Flag indicating an ongoing read. The reader will stop once set to `false`.
694+
std::atomic<bool> reading = true;
695+
696+
// Reader
697+
auto& io_tp = resources_.io_tp();
698+
ThreadPool::Task read_task = io_tp.execute([&] {
699+
while (reading) {
700+
tdb_shared_ptr<FragmentConsolidationWorkspace> cw =
701+
tdb::make_shared<FragmentConsolidationWorkspace>(
702+
HERE(), consolidator_memory_tracker_);
703+
// READ
704+
try {
705+
// Set the read query buffers.
706+
cw->resize_buffers(
707+
stats_,
708+
config_,
709+
reader_array_schema_latest,
710+
average_var_cell_sizes,
711+
buffer_size);
712+
set_query_buffers(query_r, *cw.get());
713+
throw_if_not_ok(query_r->submit());
714+
715+
// Only continue if Consolidation can make progress. The first buffer
716+
// will always contain fixed size data, whether it is tile offsets for
717+
// var size attribute/dimension or the actual fixed size data so we can
718+
// use its size to know if any cells were written or not.
719+
if (cw->sizes().at(0) > 0) {
720+
buffer_queue.push(cw);
721+
}
690722

691-
// Set the read query buffers outside the repeated submissions.
692-
// The Reader will reset the query buffer sizes to the original
693-
// sizes, not the potentially smaller sizes of the results after
694-
// the query submission.
695-
set_query_buffers(query_r, cw);
723+
// Once the read is complete, drain the queue and exit the reader.
724+
// Note: drain() shuts down the queue without removing elements.
725+
// The write fiber will be notified and write the remaining chunks.
726+
if (query_r->status() != QueryStatus::INCOMPLETE) {
727+
buffer_queue.drain();
728+
reading = false;
729+
break;
730+
}
731+
} catch (...) {
732+
// Enqueue caught-exceptions to be handled by the writer.
733+
buffer_queue.push(std::current_exception());
734+
reading = false;
735+
}
736+
buffer_count++;
696737

697-
do {
698-
// READ
699-
throw_if_not_ok(query_r->submit());
700-
701-
// If Consolidation cannot make any progress, throw. The first buffer will
702-
// always contain fixed size data, whether it is tile offsets for var size
703-
// attribute/dimension or the actual fixed size data so we can use its size
704-
// to know if any cells were written or not.
705-
if (cw.sizes().at(0) == 0) {
706-
throw FragmentConsolidatorException(
707-
"Consolidation read 0 cells, no progress can be made");
738+
io_tp.wait_until([&]() { return buffer_count < max_buffer_count; });
739+
}
740+
return Status::Ok();
741+
});
742+
743+
// Writer
744+
while (true) {
745+
// Allow ProducerConsumerQueue to wait for an element to be enqueued.
746+
auto buffer_queue_element = buffer_queue.pop_back();
747+
if (!buffer_queue_element.has_value()) {
748+
// Stop writing once the queue is empty
749+
break;
708750
}
709751

710-
// Set explicitly the write query buffers, as the sizes may have
711-
// been altered by the read query.
712-
set_query_buffers(query_w, cw);
752+
auto& buffer = buffer_queue_element.value();
753+
// Rethrow read-enqueued exceptions.
754+
if (std::holds_alternative<std::exception_ptr>(buffer)) {
755+
std::rethrow_exception(std::get<std::exception_ptr>(buffer));
756+
}
713757

714758
// WRITE
715-
throw_if_not_ok(query_w->submit());
716-
} while (query_r->status() == QueryStatus::INCOMPLETE);
759+
auto& writebuf = std::get<0>(buffer);
760+
try {
761+
// Explicitly set the write query buffers, as the sizes may have
762+
// been altered by the read query.
763+
set_query_buffers(query_w, *writebuf.get());
764+
throw_if_not_ok(query_w->submit());
765+
buffer_count--;
766+
} catch (...) {
767+
reading = false; // Stop the reader.
768+
throw;
769+
}
770+
}
771+
772+
throw_if_not_ok(read_task.wait());
717773
}
718774

719775
Status FragmentConsolidator::create_queries(

tiledb/sm/consolidator/fragment_consolidator.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*
66
* The MIT License
77
*
8-
* @copyright Copyright (c) 2022-2024 TileDB, Inc.
8+
* @copyright Copyright (c) 2022-2025 TileDB, Inc.
99
*
1010
* Permission is hereby granted, free of charge, to any person obtaining a copy
1111
* of this software and associated documentation files (the "Software"), to deal
@@ -293,27 +293,32 @@ class FragmentConsolidator : public Consolidator {
293293
* @param new_fragment_uri The URI of the fragment created after
294294
* consolidating the `to_consolidate` fragments.
295295
* @param cw A workspace containing buffers for the queries
296-
* @return Status
297296
*/
298-
Status consolidate_internal(
297+
void consolidate_internal(
299298
shared_ptr<Array> array_for_reads,
300299
shared_ptr<Array> array_for_writes,
301300
const std::vector<TimestampedURI>& to_consolidate,
302301
const NDRange& union_non_empty_domains,
303-
URI* new_fragment_uri,
304-
FragmentConsolidationWorkspace& cw);
302+
URI* new_fragment_uri);
305303

306304
/**
307-
* Copies the array by reading from the fragments to be consolidated
308-
* with `query_r` and writing to the new fragment with `query_w`.
305+
* Copies the array by concurrently reading from the fragments to be
306+
* consolidated with `query_r` and writing to the new fragment with `query_w`.
309307
* It also appropriately sets the query buffers.
310308
*
311309
* @param query_r The read query.
312310
* @param query_w The write query.
313-
* @param cw A workspace containing buffers for the queries
311+
* @param reader_array_schema_latest The reader's latest array schema.
312+
* @param avg_var_cell_sizes A map of the reader's computed average cell size
313+
* for var size attrs / dims.
314+
* @param buffer_size The size of the buffers.
314315
*/
315316
void copy_array(
316-
Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw);
317+
Query* query_r,
318+
Query* query_w,
319+
const ArraySchema& reader_array_schema_latest,
320+
std::unordered_map<std::string, uint64_t> avg_var_cell_sizes,
321+
uint64_t buffer_size);
317322

318323
/**
319324
* Creates the queries needed for consolidation. It also retrieves

tiledb/sm/filesystem/mem_filesystem.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ class MemFilesystem::File : public MemFilesystem::FSNode {
185185

186186
if (offset + nbytes > size_)
187187
return LOG_STATUS(Status_MemFSError(fmt::format(
188-
"Cannot read from file; Read exceeds file size: offset {} nbytes {} "
188+
"Cannot read from file; Read exceeds file size: offset {} nbytes "
189+
"{} "
189190
"size_ {}",
190191
offset,
191192
nbytes,
@@ -559,7 +560,8 @@ MemFilesystem::FSNode* MemFilesystem::create_dir_internal(
559560
cur->children_[token] = tdb_unique_ptr<FSNode>(tdb_new(Directory));
560561
} else if (!cur->is_dir()) {
561562
throw MemFSException(std::string(
562-
"Cannot create directory, a file with that name exists already: " +
563+
"Cannot create directory, a file with that name exists "
564+
"already: " +
563565
path));
564566
}
565567

0 commit comments

Comments
 (0)