Skip to content

Commit 83f95bd

Browse files
committed
Wait for read task to finish when writer fails, finalize in copy_array.
1 parent c38710a commit 83f95bd

File tree

2 files changed

+69
-46
lines changed

2 files changed

+69
-46
lines changed

tiledb/sm/consolidator/fragment_consolidator.cc

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,17 @@ Status FragmentConsolidator::consolidate(
275275

276276
// Consolidate the selected fragments
277277
URI new_fragment_uri;
278-
try {
279-
consolidate_internal(
280-
array_for_reads,
281-
array_for_writes,
282-
to_consolidate,
283-
union_non_empty_domains,
284-
&new_fragment_uri);
285-
} catch (...) {
278+
st = consolidate_internal(
279+
array_for_reads,
280+
array_for_writes,
281+
to_consolidate,
282+
union_non_empty_domains,
283+
&new_fragment_uri);
284+
if (!st.ok()) {
285+
std::cerr << "FAILED: " << st.message() << std::endl;
286286
throw_if_not_ok(array_for_reads->close());
287287
throw_if_not_ok(array_for_writes->close());
288-
throw;
288+
return st;
289289
}
290290

291291
// Load info of the consolidated fragment and add it
@@ -463,17 +463,17 @@ Status FragmentConsolidator::consolidate_fragments(
463463

464464
// Consolidate the selected fragments
465465
URI new_fragment_uri;
466-
try {
467-
consolidate_internal(
468-
array_for_reads,
469-
array_for_writes,
470-
to_consolidate,
471-
union_non_empty_domains,
472-
&new_fragment_uri);
473-
} catch (...) {
466+
st = consolidate_internal(
467+
array_for_reads,
468+
array_for_writes,
469+
to_consolidate,
470+
union_non_empty_domains,
471+
&new_fragment_uri);
472+
if (!st.ok()) {
473+
std::cerr << "FAILED: " << st.message() << std::endl;
474474
throw_if_not_ok(array_for_reads->close());
475475
throw_if_not_ok(array_for_writes->close());
476-
throw;
476+
return st;
477477
}
478478

479479
// Load info of the consolidated fragment and add it
@@ -572,7 +572,7 @@ bool FragmentConsolidator::are_consolidatable(
572572
return (double(union_cell_num) / sum_cell_num) <= config_.amplification_;
573573
}
574574

575-
void FragmentConsolidator::consolidate_internal(
575+
Status FragmentConsolidator::consolidate_internal(
576576
shared_ptr<Array> array_for_reads,
577577
shared_ptr<Array> array_for_writes,
578578
const std::vector<TimestampedURI>& to_consolidate,
@@ -583,7 +583,7 @@ void FragmentConsolidator::consolidate_internal(
583583
array_for_reads->load_fragments(to_consolidate);
584584

585585
if (array_for_reads->is_empty()) {
586-
return;
586+
return Status::Ok();
587587
}
588588

589589
// Get schema
@@ -627,7 +627,7 @@ void FragmentConsolidator::consolidate_internal(
627627
// Create queries
628628
tdb_unique_ptr<Query> query_r = nullptr;
629629
tdb_unique_ptr<Query> query_w = nullptr;
630-
throw_if_not_ok(create_queries(
630+
RETURN_NOT_OK(create_queries(
631631
array_for_reads,
632632
array_for_writes,
633633
union_non_empty_domains,
@@ -649,41 +649,46 @@ void FragmentConsolidator::consolidate_internal(
649649

650650
// Read from one array and write to the other
651651
copy_array(
652-
query_r.get(),
653-
query_w.get(),
652+
std::move(query_r),
653+
std::move(query_w),
654654
array_schema,
655-
array_for_reads->get_average_var_cell_sizes());
655+
array_for_reads->get_average_var_cell_sizes(),
656+
new_fragment_uri);
656657

657-
// Finalize write query
658-
auto st = query_w->finalize();
659-
if (!st.ok()) {
660-
if (resources_.vfs().is_dir(*new_fragment_uri))
661-
resources_.vfs().remove_dir(*new_fragment_uri);
662-
throw FragmentConsolidatorException(st.message());
663-
}
658+
// // Finalize write query
659+
// auto st = query_w->finalize();
660+
// if (!st.ok()) {
661+
// if (resources_.vfs().is_dir(*new_fragment_uri))
662+
// resources_.vfs().remove_dir(*new_fragment_uri);
663+
// return st;
664+
// }
664665

665666
// Write vacuum file
666-
st = write_vacuum_file(
667+
auto st = write_vacuum_file(
667668
array_for_reads->array_schema_latest().write_version(),
668669
array_for_reads->array_uri(),
669670
vac_uri,
670671
to_consolidate);
671672
if (!st.ok()) {
672673
if (resources_.vfs().is_dir(*new_fragment_uri))
673674
resources_.vfs().remove_dir(*new_fragment_uri);
674-
throw FragmentConsolidatorException(st.message());
675+
return st;
675676
}
677+
return st;
676678
}
677679

678680
void FragmentConsolidator::copy_array(
679-
Query* query_r,
680-
Query* query_w,
681+
shared_ptr<Query> query_r,
682+
shared_ptr<Query> query_w,
681683
const ArraySchema& reader_array_schema_latest,
682-
std::unordered_map<std::string, uint64_t> average_var_cell_sizes) {
684+
std::unordered_map<std::string, uint64_t> average_var_cell_sizes,
685+
URI* new_fragment_uri) {
683686
// The size of the buffers.
684-
uint64_t buffer_size = 10485760; // 10 MB
685-
uint64_t initial_buffer_size =
686-
buffer_size; // initial, "ungrown", value of `buffer_size`.
687+
uint64_t buffer_size = std::min(
688+
(uint64_t)10485760,
689+
config_.total_budget_); // 10 MB, or total_mem_budget if smaller.
690+
// Initial, "ungrown", value of `buffer_size`.
691+
const uint64_t initial_buffer_size = buffer_size;
687692

688693
// Deque which stores the buffers passed between the reader and writer.
689694
// Cannot exceed `config_.total_budget_`.
@@ -715,7 +720,7 @@ void FragmentConsolidator::copy_array(
715720
reader_array_schema_latest,
716721
average_var_cell_sizes,
717722
buffer_size);
718-
set_query_buffers(query_r, *cw.get());
723+
set_query_buffers(query_r.get(), *cw.get());
719724
throw_if_not_ok(query_r->submit());
720725

721726
// Only continue if Consolidation can make progress. The first buffer
@@ -731,7 +736,8 @@ void FragmentConsolidator::copy_array(
731736
// If it's not the first read, grow the buffer and try again.
732737
buffer_size += std::min(
733738
config_.total_budget_ - allocated_buffer_size, (2 * buffer_size));
734-
continue;
739+
// continue; -> we want to try again, but need to append
740+
// allocated_buffer_size & potentially wait on writer
735741
} else {
736742
buffer_queue.push(cw);
737743
}
@@ -777,16 +783,28 @@ void FragmentConsolidator::copy_array(
777783
try {
778784
// Explicitly set the write query buffers, as the sizes may have
779785
// been altered by the read query.
780-
set_query_buffers(query_w, *writebuf.get());
786+
set_query_buffers(query_w.get(), *writebuf.get());
781787
throw_if_not_ok(query_w->submit());
782788
allocated_buffer_size -= buffer_size;
783789
} catch (...) {
784790
reading = false; // Stop the reader.
791+
throw_if_not_ok(read_task.wait());
792+
if (resources_.vfs().is_dir(*new_fragment_uri)) {
793+
resources_.vfs().remove_dir(*new_fragment_uri);
794+
}
785795
throw;
786796
}
787797
}
788798

789799
throw_if_not_ok(read_task.wait());
800+
801+
// Finalize write query
802+
auto st = query_w->finalize();
803+
if (!st.ok()) {
804+
if (resources_.vfs().is_dir(*new_fragment_uri))
805+
resources_.vfs().remove_dir(*new_fragment_uri);
806+
throw st.message();
807+
}
790808
}
791809

792810
Status FragmentConsolidator::create_queries(

tiledb/sm/consolidator/fragment_consolidator.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,10 @@ class FragmentConsolidator : public Consolidator {
293293
* @param new_fragment_uri The URI of the fragment created after
294294
* consolidating the `to_consolidate` fragments.
295295
* @param cw A workspace containing buffers for the queries
296+
*
297+
* @return Status
296298
*/
297-
void consolidate_internal(
299+
Status consolidate_internal(
298300
shared_ptr<Array> array_for_reads,
299301
shared_ptr<Array> array_for_writes,
300302
const std::vector<TimestampedURI>& to_consolidate,
@@ -311,12 +313,15 @@ class FragmentConsolidator : public Consolidator {
311313
* @param reader_array_schema_latest The reader's latest array schema.
312314
* @param avg_var_cell_sizes A map of the reader's computed average cell size
313315
* for var size attrs / dims.
316+
* @param new_fragment_uri The URI of the fragment created after
317+
* consolidating the `to_consolidate` fragments.
314318
*/
315319
void copy_array(
316-
Query* query_r,
317-
Query* query_w,
320+
shared_ptr<Query> query_r,
321+
shared_ptr<Query> query_w,
318322
const ArraySchema& reader_array_schema_latest,
319-
std::unordered_map<std::string, uint64_t> avg_var_cell_sizes);
323+
std::unordered_map<std::string, uint64_t> avg_var_cell_sizes,
324+
URI* new_fragment_uri);
320325

321326
/**
322327
* Creates the queries needed for consolidation. It also retrieves

0 commit comments

Comments
 (0)