@@ -617,8 +617,14 @@ void FragmentConsolidator::consolidate_internal(
617617 }
618618 }
619619
620+ // Compute memory budgets
621+ uint64_t total_weights =
622+ config_.buffers_weight_ + config_.reader_weight_ + config_.writer_weight_ ;
623+ uint64_t single_unit_budget = config_.total_budget_ / total_weights;
624+ uint64_t reader_budget = config_.reader_weight_ * single_unit_budget;
625+ uint64_t writer_budget = config_.writer_weight_ * single_unit_budget;
626+
620627 // Create queries
621- uint64_t buffer_size = 10485760 ; // 10 MB
622628 tdb_unique_ptr<Query> query_r = nullptr ;
623629 tdb_unique_ptr<Query> query_w = nullptr ;
624630 throw_if_not_ok (create_queries (
@@ -628,8 +634,8 @@ void FragmentConsolidator::consolidate_internal(
628634 query_r,
629635 query_w,
630636 new_fragment_uri,
631- buffer_size ,
632- buffer_size ));
637+ reader_budget ,
638+ writer_budget ));
633639
634640 // Get the vacuum URI
635641 URI vac_uri;
@@ -646,8 +652,7 @@ void FragmentConsolidator::consolidate_internal(
646652 query_r.get (),
647653 query_w.get (),
648654 array_schema,
649- array_for_reads->get_average_var_cell_sizes (),
650- buffer_size);
655+ array_for_reads->get_average_var_cell_sizes ());
651656
652657 // Finalize write query
653658 auto st = query_w->finalize ();
@@ -674,21 +679,22 @@ void FragmentConsolidator::copy_array(
674679 Query* query_r,
675680 Query* query_w,
676681 const ArraySchema& reader_array_schema_latest,
677- std::unordered_map<std::string, uint64_t > average_var_cell_sizes,
678- uint64_t buffer_size) {
679- // The maximum number of buffers the reader may allocate.
680- uint64_t max_buffer_count = 10 ;
682+ std::unordered_map<std::string, uint64_t > average_var_cell_sizes) {
683+ // The size of the buffers.
684+ uint64_t buffer_size = 10485760 ; // 10 MB
685+ uint64_t initial_buffer_size =
686+ buffer_size; // initial, "ungrown", value of `buffer_size`.
681687
682- // Deque which stores the buffers passed between the reader and writer. Cannot
683- // exceed size `max_buffer_count `.
688+ // Deque which stores the buffers passed between the reader and writer.
689+ // Cannot exceed `config_.total_budget_ `.
684690 ProducerConsumerQueue<std::variant<
685691 tdb_shared_ptr<FragmentConsolidationWorkspace>,
686692 std::exception_ptr>>
687693 buffer_queue;
688694
689695 // Atomic counter of the queue buffers allocated by the reader.
690- // May not exceed `max_buffer_count `.
691- std::atomic<uint64_t > buffer_count = 0 ;
696+ // May not exceed `config_.total_budget_ `.
697+ std::atomic<uint64_t > allocated_buffer_size = 0 ;
692698
693699 // Flag indicating an ongoing read. The reader will stop once set to `false`.
694700 std::atomic<bool > reading = true ;
@@ -716,7 +722,17 @@ void FragmentConsolidator::copy_array(
716722 // will always contain fixed size data, whether it is tile offsets for
717723 // var size attribute/dimension or the actual fixed size data so we can
718724 // use its size to know if any cells were written or not.
719- if (cw->sizes ().at (0 ) > 0 ) {
725+ if (cw->sizes ().at (0 ) == 0 ) {
726+ if (buffer_size == initial_buffer_size) {
727+ // If the first read failed, throw.
728+ throw FragmentConsolidatorException (
729+ " Consolidation read 0 cells, no progress can be made" );
730+ }
731+ // If it's not the first read, grow the buffer and try again.
732+ buffer_size += std::min (
733+ config_.total_budget_ - allocated_buffer_size, (2 * buffer_size));
734+ continue ;
735+ } else {
720736 buffer_queue.push (cw);
721737 }
722738
@@ -733,9 +749,10 @@ void FragmentConsolidator::copy_array(
733749 buffer_queue.push (std::current_exception ());
734750 reading = false ;
735751 }
736- buffer_count++ ;
752+ allocated_buffer_size += buffer_size ;
737753
738- io_tp.wait_until ([&]() { return buffer_count < max_buffer_count; });
754+ io_tp.wait_until (
755+ [&]() { return allocated_buffer_size < config_.total_budget_ ; });
739756 }
740757 return Status::Ok ();
741758 });
@@ -762,7 +779,7 @@ void FragmentConsolidator::copy_array(
762779 // been altered by the read query.
763780 set_query_buffers (query_w, *writebuf.get ());
764781 throw_if_not_ok (query_w->submit ());
765- buffer_count-- ;
782+ allocated_buffer_size -= buffer_size ;
766783 } catch (...) {
767784 reading = false ; // Stop the reader.
768785 throw ;
0 commit comments