@@ -470,7 +470,6 @@ Status FragmentConsolidator::consolidate_fragments(
470470 union_non_empty_domains,
471471 &new_fragment_uri);
472472 if (!st.ok ()) {
473- std::cerr << " FAILED: " << st.message () << std::endl;
474473 throw_if_not_ok (array_for_reads->close ());
475474 throw_if_not_ok (array_for_writes->close ());
476475 return st;
@@ -647,48 +646,48 @@ Status FragmentConsolidator::consolidate_internal(
647646 " Internal consolidation failed with exception" + std::string (e.what ()));
648647 }
649648
650- // Read from one array and write to the other
651- copy_array (
652- std::move (query_r),
653- std::move (query_w),
654- array_schema,
655- array_for_reads->get_average_var_cell_sizes (),
656- new_fragment_uri);
657-
658- // // Finalize write query
659- // auto st = query_w->finalize();
660- // if (!st.ok()) {
661- // if (resources_.vfs().is_dir(*new_fragment_uri))
662- // resources_.vfs().remove_dir(*new_fragment_uri);
663- // return st;
664- // }
665-
666- // Write vacuum file
667- auto st = write_vacuum_file (
668- array_for_reads->array_schema_latest ().write_version (),
669- array_for_reads->array_uri (),
670- vac_uri,
671- to_consolidate);
672- if (!st.ok ()) {
673- if (resources_.vfs ().is_dir (*new_fragment_uri))
649+ // Consolidate fragments
650+ try {
651+ // Read from one array and write to the other
652+ copy_array (
653+ query_r.get (),
654+ query_w.get (),
655+ array_schema,
656+ array_for_reads->get_average_var_cell_sizes ());
657+ // Write vacuum file
658+ throw_if_not_ok (write_vacuum_file (
659+ array_for_reads->array_schema_latest ().write_version (),
660+ array_for_reads->array_uri (),
661+ vac_uri,
662+ to_consolidate));
663+ } catch (...) {
664+ if (resources_.vfs ().is_dir (*new_fragment_uri)) {
674665 resources_.vfs ().remove_dir (*new_fragment_uri);
675- return st;
666+ }
667+ std::rethrow_exception (std::current_exception ());
676668 }
677- return st;
669+
670+ return Status::Ok ();
678671}
679672
680673void FragmentConsolidator::copy_array (
681- shared_ptr< Query> query_r,
682- shared_ptr< Query> query_w,
674+ Query* query_r,
675+ Query* query_w,
683676 const ArraySchema& reader_array_schema_latest,
684- std::unordered_map<std::string, uint64_t > average_var_cell_sizes,
685- URI* new_fragment_uri) {
677+ std::unordered_map<std::string, uint64_t > average_var_cell_sizes) {
686678 // The size of the buffers.
687- uint64_t buffer_size = std::min (
688- (uint64_t )10485760 ,
689- config_.total_budget_ ); // 10 MB, or total_mem_budget if smaller.
679+ // 10MB by default, unless total_budget_ is smaller, or buffer_size_ is set.
680+ uint64_t buffer_size =
681+ config_.buffer_size_ != 0 ?
682+ config_.buffer_size_ :
683+ std::min ((uint64_t )10485760 , config_.total_budget_ );
690684 // Initial, "ungrown", value of `buffer_size`.
691685 const uint64_t initial_buffer_size = buffer_size;
686+ if (buffer_size > config_.total_budget_ ) {
687+ throw FragmentConsolidatorException (
688+ " Consolidation cannot proceed without disrespecting the memory "
689+ " budget." );
690+ }
692691
693692 // Deque which stores the buffers passed between the reader and writer.
694693 // Cannot exceed `config_.total_budget_`.
@@ -720,7 +719,7 @@ void FragmentConsolidator::copy_array(
720719 reader_array_schema_latest,
721720 average_var_cell_sizes,
722721 buffer_size);
723- set_query_buffers (query_r. get () , *cw.get ());
722+ set_query_buffers (query_r, *cw.get ());
724723 throw_if_not_ok (query_r->submit ());
725724
726725 // Only continue if Consolidation can make progress. The first buffer
@@ -736,8 +735,6 @@ void FragmentConsolidator::copy_array(
736735 // If it's not the first read, grow the buffer and try again.
737736 buffer_size += std::min (
738737 config_.total_budget_ - allocated_buffer_size, (2 * buffer_size));
739- // continue; -> we want to try again, but need to append
740- // allocated_buffer_size & potentially wait on writer
741738 } else {
742739 buffer_queue.push (cw);
743740 }
@@ -753,7 +750,10 @@ void FragmentConsolidator::copy_array(
753750 } catch (...) {
754751 // Enqueue caught-exceptions to be handled by the writer.
755752 buffer_queue.push (std::current_exception ());
753+ allocated_buffer_size++; // increase buffer size to maintain queue
754+ // logic
756755 reading = false ;
756+ break ;
757757 }
758758 allocated_buffer_size += buffer_size;
759759
@@ -783,28 +783,21 @@ void FragmentConsolidator::copy_array(
783783 try {
784784 // Explicitly set the write query buffers, as the sizes may have
785785 // been altered by the read query.
786- set_query_buffers (query_w. get () , *writebuf.get ());
786+ set_query_buffers (query_w, *writebuf.get ());
787787 throw_if_not_ok (query_w->submit ());
788788 allocated_buffer_size -= buffer_size;
789789 } catch (...) {
790790 reading = false ; // Stop the reader.
791791 throw_if_not_ok (read_task.wait ());
792- if (resources_.vfs ().is_dir (*new_fragment_uri)) {
793- resources_.vfs ().remove_dir (*new_fragment_uri);
794- }
795792 throw ;
796793 }
797794 }
798795
796+ // Wait for reader to finish
799797 throw_if_not_ok (read_task.wait ());
800798
801799 // Finalize write query
802- auto st = query_w->finalize ();
803- if (!st.ok ()) {
804- if (resources_.vfs ().is_dir (*new_fragment_uri))
805- resources_.vfs ().remove_dir (*new_fragment_uri);
806- throw st.message ();
807- }
800+ throw_if_not_ok (query_w->finalize ());
808801}
809802
810803Status FragmentConsolidator::create_queries (
0 commit comments