@@ -248,8 +248,6 @@ Status FragmentConsolidator::consolidate(
248248 return st;
249249 }
250250
251- FragmentConsolidationWorkspace cw (consolidator_memory_tracker_);
252-
253251 uint32_t step = 0 ;
254252 std::vector<TimestampedURI> to_consolidate;
255253 do {
@@ -277,17 +275,17 @@ Status FragmentConsolidator::consolidate(
277275
278276 // Consolidate the selected fragments
279277 URI new_fragment_uri;
280- st = consolidate_internal (
281- array_for_reads,
282- array_for_writes ,
283- to_consolidate ,
284- union_non_empty_domains ,
285- &new_fragment_uri ,
286- cw );
287- if (!st. ok () ) {
278+ try {
279+ consolidate_internal (
280+ array_for_reads ,
281+ array_for_writes ,
282+ to_consolidate ,
283+ union_non_empty_domains ,
284+ &new_fragment_uri );
285+ } catch (... ) {
288286 throw_if_not_ok (array_for_reads->close ());
289287 throw_if_not_ok (array_for_writes->close ());
290- return st ;
288+ throw ;
291289 }
292290
293291 // Load info of the consolidated fragment and add it
@@ -463,21 +461,19 @@ Status FragmentConsolidator::consolidate_fragments(
463461 }
464462 }
465463
466- FragmentConsolidationWorkspace cw (consolidator_memory_tracker_);
467-
468464 // Consolidate the selected fragments
469465 URI new_fragment_uri;
470- st = consolidate_internal (
471- array_for_reads,
472- array_for_writes ,
473- to_consolidate ,
474- union_non_empty_domains ,
475- &new_fragment_uri ,
476- cw );
477- if (!st. ok () ) {
466+ try {
467+ consolidate_internal (
468+ array_for_reads ,
469+ array_for_writes ,
470+ to_consolidate ,
471+ union_non_empty_domains ,
472+ &new_fragment_uri );
473+ } catch (... ) {
478474 throw_if_not_ok (array_for_reads->close ());
479475 throw_if_not_ok (array_for_writes->close ());
480- return st ;
476+ throw ;
481477 }
482478
483479 // Load info of the consolidated fragment and add it
@@ -576,19 +572,18 @@ bool FragmentConsolidator::are_consolidatable(
576572 return (double (union_cell_num) / sum_cell_num) <= config_.amplification_ ;
577573}
578574
579- Status FragmentConsolidator::consolidate_internal (
575+ void FragmentConsolidator::consolidate_internal (
580576 shared_ptr<Array> array_for_reads,
581577 shared_ptr<Array> array_for_writes,
582578 const std::vector<TimestampedURI>& to_consolidate,
583579 const NDRange& union_non_empty_domains,
584- URI* new_fragment_uri,
585- FragmentConsolidationWorkspace& cw) {
580+ URI* new_fragment_uri) {
586581 auto timer_se = stats_->start_timer (" consolidate_internal" );
587582
588583 array_for_reads->load_fragments (to_consolidate);
589584
590585 if (array_for_reads->is_empty ()) {
591- return Status::Ok () ;
586+ return ;
592587 }
593588
594589 // Get schema
@@ -622,20 +617,8 @@ Status FragmentConsolidator::consolidate_internal(
622617 }
623618 }
624619
625- // Compute memory budgets
626- uint64_t total_weights =
627- config_.buffers_weight_ + config_.reader_weight_ + config_.writer_weight_ ;
628- uint64_t single_unit_budget = config_.total_budget_ / total_weights;
629- uint64_t buffers_budget = config_.buffers_weight_ * single_unit_budget;
630- uint64_t reader_budget = config_.reader_weight_ * single_unit_budget;
631- uint64_t writer_budget = config_.writer_weight_ * single_unit_budget;
632-
633- // Prepare buffers
634- auto average_var_cell_sizes = array_for_reads->get_average_var_cell_sizes ();
635- cw.resize_buffers (
636- stats_, config_, array_schema, average_var_cell_sizes, buffers_budget);
637-
638620 // Create queries
621+ uint64_t buffer_size = 10485760 ; // 10 MB
639622 tdb_unique_ptr<Query> query_r = nullptr ;
640623 tdb_unique_ptr<Query> query_w = nullptr ;
641624 throw_if_not_ok (create_queries (
@@ -645,28 +628,33 @@ Status FragmentConsolidator::consolidate_internal(
645628 query_r,
646629 query_w,
647630 new_fragment_uri,
648- reader_budget ,
649- writer_budget ));
631+ buffer_size ,
632+ buffer_size ));
650633
651634 // Get the vacuum URI
652635 URI vac_uri;
653636 try {
654637 vac_uri =
655638 array_for_reads->array_directory ().get_vacuum_uri (*new_fragment_uri);
656639 } catch (std::exception& e) {
657- FragmentConsolidatorException (
640+ throw FragmentConsolidatorException (
658641 " Internal consolidation failed with exception" + std::string (e.what ()));
659642 }
660643
661644 // Read from one array and write to the other
662- copy_array (query_r.get (), query_w.get (), cw);
645+ copy_array (
646+ query_r.get (),
647+ query_w.get (),
648+ array_schema,
649+ array_for_reads->get_average_var_cell_sizes (),
650+ buffer_size);
663651
664652 // Finalize write query
665653 auto st = query_w->finalize ();
666654 if (!st.ok ()) {
667655 if (resources_.vfs ().is_dir (*new_fragment_uri))
668656 resources_.vfs ().remove_dir (*new_fragment_uri);
669- return st ;
657+ throw FragmentConsolidatorException (st. message ()) ;
670658 }
671659
672660 // Write vacuum file
@@ -678,42 +666,110 @@ Status FragmentConsolidator::consolidate_internal(
678666 if (!st.ok ()) {
679667 if (resources_.vfs ().is_dir (*new_fragment_uri))
680668 resources_.vfs ().remove_dir (*new_fragment_uri);
681- return st ;
669+ throw FragmentConsolidatorException (st. message ()) ;
682670 }
683-
684- return st;
685671}
686672
687673void FragmentConsolidator::copy_array (
688- Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw) {
689- auto timer_se = stats_->start_timer (" consolidate_copy_array" );
674+ Query* query_r,
675+ Query* query_w,
676+ const ArraySchema& reader_array_schema_latest,
677+ std::unordered_map<std::string, uint64_t > average_var_cell_sizes,
678+ uint64_t buffer_size) {
679+ // The maximum number of buffers the reader may allocate.
680+ uint64_t max_buffer_count = 10 ;
681+
682+ // Deque which stores the buffers passed between the reader and writer. Cannot
683+ // exceed size `max_buffer_count`.
684+ ProducerConsumerQueue<std::variant<
685+ tdb_shared_ptr<FragmentConsolidationWorkspace>,
686+ std::exception_ptr>>
687+ buffer_queue;
688+
689+ // Atomic counter of the queue buffers allocated by the reader.
690+ // May not exceed `max_buffer_count`.
691+ std::atomic<uint64_t > buffer_count = 0 ;
692+
693+ // Flag indicating an ongoing read. The reader will stop once set to `false`.
694+ std::atomic<bool > reading = true ;
695+
696+ // Reader
697+ auto & io_tp = resources_.io_tp ();
698+ ThreadPool::Task read_task = io_tp.execute ([&] {
699+ while (reading) {
700+ tdb_shared_ptr<FragmentConsolidationWorkspace> cw =
701+ tdb::make_shared<FragmentConsolidationWorkspace>(
702+ HERE (), consolidator_memory_tracker_);
703+ // READ
704+ try {
705+ // Set the read query buffers.
706+ cw->resize_buffers (
707+ stats_,
708+ config_,
709+ reader_array_schema_latest,
710+ average_var_cell_sizes,
711+ buffer_size);
712+ set_query_buffers (query_r, *cw.get ());
713+ throw_if_not_ok (query_r->submit ());
714+
715+ // Only continue if Consolidation can make progress. The first buffer
716+ // will always contain fixed size data, whether it is tile offsets for
717+ // var size attribute/dimension or the actual fixed size data so we can
718+ // use its size to know if any cells were written or not.
719+ if (cw->sizes ().at (0 ) > 0 ) {
720+ buffer_queue.push (cw);
721+ }
690722
691- // Set the read query buffers outside the repeated submissions.
692- // The Reader will reset the query buffer sizes to the original
693- // sizes, not the potentially smaller sizes of the results after
694- // the query submission.
695- set_query_buffers (query_r, cw);
723+ // Once the read is complete, drain the queue and exit the reader.
724+ // Note: drain() shuts down the queue without removing elements.
725+ // The write fiber will be notified and write the remaining chunks.
726+ if (query_r->status () != QueryStatus::INCOMPLETE) {
727+ buffer_queue.drain ();
728+ reading = false ;
729+ break ;
730+ }
731+ } catch (...) {
732+ // Enqueue caught-exceptions to be handled by the writer.
733+ buffer_queue.push (std::current_exception ());
734+ reading = false ;
735+ }
736+ buffer_count++;
696737
697- do {
698- // READ
699- throw_if_not_ok (query_r->submit ());
700-
701- // If Consolidation cannot make any progress, throw. The first buffer will
702- // always contain fixed size data, whether it is tile offsets for var size
703- // attribute/dimension or the actual fixed size data so we can use its size
704- // to know if any cells were written or not.
705- if (cw.sizes ().at (0 ) == 0 ) {
706- throw FragmentConsolidatorException (
707- " Consolidation read 0 cells, no progress can be made" );
738+ io_tp.wait_until ([&]() { return buffer_count < max_buffer_count; });
739+ }
740+ return Status::Ok ();
741+ });
742+
743+ // Writer
744+ while (true ) {
745+ // Allow ProducerConsumerQueue to wait for an element to be enqueued.
746+ auto buffer_queue_element = buffer_queue.pop_back ();
747+ if (!buffer_queue_element.has_value ()) {
748+ // Stop writing once the queue is empty
749+ break ;
708750 }
709751
710- // Set explicitly the write query buffers, as the sizes may have
711- // been altered by the read query.
712- set_query_buffers (query_w, cw);
752+ auto & buffer = buffer_queue_element.value ();
753+ // Rethrow read-enqueued exceptions.
754+ if (std::holds_alternative<std::exception_ptr>(buffer)) {
755+ std::rethrow_exception (std::get<std::exception_ptr>(buffer));
756+ }
713757
714758 // WRITE
715- throw_if_not_ok (query_w->submit ());
716- } while (query_r->status () == QueryStatus::INCOMPLETE);
759+ auto & writebuf = std::get<0 >(buffer);
760+ try {
761+ // Explicitly set the write query buffers, as the sizes may have
762+ // been altered by the read query.
763+ set_query_buffers (query_w, *writebuf.get ());
764+ throw_if_not_ok (query_w->submit ());
765+ buffer_count--;
766+ } catch (...) {
767+ reading = false ; // Stop the reader.
768+ throw ;
769+ }
770+ }
771+
772+ throw_if_not_ok (read_task.wait ());
717773}
718774
719775Status FragmentConsolidator::create_queries (
0 commit comments