diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index e1907946285..d31b78244be 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2021 TileDB Inc. + * @copyright Copyright (c) 2017-2025 TileDB Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -7589,23 +7589,82 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, - "C API: Test consolidation, sparse string, no progress", - "[capi][consolidation][sparse][string][no-progress][non-rest]") { + "C API: Test sparse fragment consolidation", + "[capi][consolidation][fragment][sparse][non-rest]") { remove_sparse_string_array(); create_sparse_string_array(); - write_sparse_string_full(); write_sparse_string_unordered(); - consolidate_sparse_string(1, true); - tiledb_error_t* err = NULL; - tiledb_ctx_get_last_error(ctx_, &err); + SECTION("success") { + // Write large, 25MB chunk which outsizes the default buffer size of 10MB + const std::string test_chars = "abcdefghijklmnopqrstuvwxyz"; + uint64_t test_str_size = 26214400; + std::string test_str; + test_str.reserve(test_str_size); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(0, test_chars.length() - 1); + for (size_t i = 0; i < test_str_size; ++i) { + test_str += test_chars[dist(gen)]; + } + REQUIRE(test_str.size() == test_str_size); + + // Consolidate + tiledb_config_t* cfg; + tiledb_error_t* err = nullptr; + int rc = tiledb_config_alloc(&cfg, &err); + REQUIRE(rc == TILEDB_OK); + REQUIRE(err == nullptr); + + // Consolidate + if (encryption_type_ != TILEDB_NO_ENCRYPTION) { + std::string encryption_type_string = + encryption_type_str((tiledb::sm::EncryptionType)encryption_type_); + rc = tiledb_config_set( + cfg, "sm.encryption_type", encryption_type_string.c_str(), &err); + REQUIRE(err == nullptr); + rc = tiledb_config_set(cfg, "sm.encryption_key", encryption_key_, &err); + REQUIRE(rc == TILEDB_OK); + REQUIRE(err == nullptr); + rc = + tiledb_array_consolidate(ctx_, sparse_string_array_uri_.c_str(), cfg); + tiledb_config_free(&cfg); + } else { + rc = + tiledb_array_consolidate(ctx_, sparse_string_array_uri_.c_str(), cfg); + } + tiledb_config_free(&cfg); + REQUIRE(rc == TILEDB_OK); + } + + SECTION("no progress") { + uint64_t string_size = 1; + std::string errmsg = ""; - const char* msg; - tiledb_error_message(err, &msg); - CHECK( - std::string("FragmentConsolidator: Consolidation read 0 cells, no " - "progress can be made") == msg); + DYNAMIC_SECTION("too small") { + string_size = 1; + errmsg = + "FragmentConsolidator: Consolidation read 0 cells, no " + "progress can be made"; + } + + DYNAMIC_SECTION("too large") { + string_size = 10737418240 + 2; // over default memory budget of 10GB + errmsg = + "FragmentConsolidator: Consolidation cannot proceed without " + "disrespecting the memory budget."; + } + + consolidate_sparse_string(string_size, true); + + tiledb_error_t* err = NULL; + tiledb_ctx_get_last_error(ctx_, &err); + + const char* msg; + tiledb_error_message(err, &msg); + CHECK(errmsg == msg); + } remove_sparse_string_array(); } diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index 51b384483c2..d444bd865e6 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -47,6 +47,7 @@ #include #include #include +#include using namespace tiledb::common; @@ -248,8 +249,6 @@ Status FragmentConsolidator::consolidate( return st; } - FragmentConsolidationWorkspace cw(consolidator_memory_tracker_); - uint32_t step = 0; std::vector to_consolidate; do { @@ -282,8 +281,7 @@ Status FragmentConsolidator::consolidate( array_for_writes, to_consolidate, union_non_empty_domains, - &new_fragment_uri, - cw); + &new_fragment_uri); if (!st.ok()) { throw_if_not_ok(array_for_reads->close()); throw_if_not_ok(array_for_writes->close()); @@ -463,8 +461,6 @@ Status FragmentConsolidator::consolidate_fragments( } } - FragmentConsolidationWorkspace cw(consolidator_memory_tracker_); - // Consolidate the selected fragments URI new_fragment_uri; st = consolidate_internal( @@ -472,8 +468,7 @@ Status FragmentConsolidator::consolidate_fragments( array_for_writes, to_consolidate, union_non_empty_domains, - &new_fragment_uri, - cw); + &new_fragment_uri); if (!st.ok()) { throw_if_not_ok(array_for_reads->close()); throw_if_not_ok(array_for_writes->close()); @@ -581,8 +576,7 @@ Status FragmentConsolidator::consolidate_internal( shared_ptr array_for_writes, const std::vector& to_consolidate, const NDRange& union_non_empty_domains, - URI* new_fragment_uri, - FragmentConsolidationWorkspace& cw) { + URI* new_fragment_uri) { auto timer_se = stats_->start_timer("consolidate_internal"); array_for_reads->load_fragments(to_consolidate); @@ -626,19 +620,13 @@ Status FragmentConsolidator::consolidate_internal( uint64_t total_weights = config_.buffers_weight_ + config_.reader_weight_ + config_.writer_weight_; uint64_t single_unit_budget = config_.total_budget_ / total_weights; - uint64_t buffers_budget = config_.buffers_weight_ * single_unit_budget; uint64_t reader_budget = config_.reader_weight_ * single_unit_budget; uint64_t writer_budget = config_.writer_weight_ * single_unit_budget; - // Prepare buffers - auto average_var_cell_sizes = array_for_reads->get_average_var_cell_sizes(); - cw.resize_buffers( - stats_, config_, array_schema, average_var_cell_sizes, buffers_budget); - // Create queries tdb_unique_ptr query_r = nullptr; tdb_unique_ptr query_w = nullptr; - throw_if_not_ok(create_queries( + RETURN_NOT_OK(create_queries( array_for_reads, array_for_writes, union_non_empty_domains, @@ -654,66 +642,162 @@ Status FragmentConsolidator::consolidate_internal( vac_uri = array_for_reads->array_directory().get_vacuum_uri(*new_fragment_uri); } catch (std::exception& e) { - FragmentConsolidatorException( + throw FragmentConsolidatorException( "Internal consolidation failed with exception" + std::string(e.what())); } - // Read from one array and write to the other - copy_array(query_r.get(), query_w.get(), cw); - - // Finalize write query - auto st = query_w->finalize(); - if (!st.ok()) { - if (resources_.vfs().is_dir(*new_fragment_uri)) - resources_.vfs().remove_dir(*new_fragment_uri); - return st; - } - - // Write vacuum file - st = write_vacuum_file( - array_for_reads->array_schema_latest().write_version(), - array_for_reads->array_uri(), - vac_uri, - to_consolidate); - if (!st.ok()) { - if (resources_.vfs().is_dir(*new_fragment_uri)) + // Consolidate fragments + try { + // Read from one array and write to the other + copy_array( + query_r.get(), + query_w.get(), + array_schema, + array_for_reads->get_average_var_cell_sizes()); + // Write vacuum file + throw_if_not_ok(write_vacuum_file( + array_for_reads->array_schema_latest().write_version(), + array_for_reads->array_uri(), + vac_uri, + to_consolidate)); + } catch (...) { + if (resources_.vfs().is_dir(*new_fragment_uri)) { resources_.vfs().remove_dir(*new_fragment_uri); - return st; + } + std::rethrow_exception(std::current_exception()); } - return st; + return Status::Ok(); } void FragmentConsolidator::copy_array( - Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw) { - auto timer_se = stats_->start_timer("consolidate_copy_array"); + Query* query_r, + Query* query_w, + const ArraySchema& reader_array_schema_latest, + std::unordered_map average_var_cell_sizes) { + // The size of the buffers. + // 10MB by default, unless total_budget_ is smaller, or buffer_size_ is set. + uint64_t buffer_size = + config_.buffer_size_ != 0 ? + config_.buffer_size_ : + std::min((uint64_t)10485760, config_.total_budget_); + // Initial, "ungrown", value of `buffer_size`. + const uint64_t initial_buffer_size = buffer_size; + if (buffer_size > config_.total_budget_) { + throw FragmentConsolidatorException( + "Consolidation cannot proceed without disrespecting the memory " + "budget."); + } - // Set the read query buffers outside the repeated submissions. - // The Reader will reset the query buffer sizes to the original - // sizes, not the potentially smaller sizes of the results after - // the query submission. - set_query_buffers(query_r, cw); + // Deque which stores the buffers passed between the reader and writer. + // Cannot exceed `config_.total_budget_`. + ProducerConsumerQueue, + std::exception_ptr>> + buffer_queue; + + // Atomic counter of the queue buffers allocated by the reader. + // May not exceed `config_.total_budget_`. + std::atomic allocated_buffer_size = 0; + + // Flag indicating an ongoing read. The reader will stop once set to `false`. + std::atomic reading = true; + + // Reader + auto& io_tp = resources_.io_tp(); + ThreadPool::Task read_task = io_tp.execute([&] { + while (reading) { + tdb_shared_ptr cw = + tdb::make_shared( + HERE(), consolidator_memory_tracker_); + // READ + try { + // Set the read query buffers. + cw->resize_buffers( + stats_, + config_, + reader_array_schema_latest, + average_var_cell_sizes, + buffer_size); + set_query_buffers(query_r, *cw.get()); + throw_if_not_ok(query_r->submit()); + + // Only continue if Consolidation can make progress. The first buffer + // will always contain fixed size data, whether it is tile offsets for + // var size attribute/dimension or the actual fixed size data so we can + // use its size to know if any cells were written or not. + if (cw->sizes().at(0) == 0) { + if (buffer_size == initial_buffer_size) { + // If the first read failed, throw. + throw FragmentConsolidatorException( + "Consolidation read 0 cells, no progress can be made"); + } + // If it's not the first read, grow the buffer and try again. + buffer_size += std::min( + config_.total_budget_ - allocated_buffer_size, (2 * buffer_size)); + } else { + buffer_queue.push(cw); + } - do { - // READ - throw_if_not_ok(query_r->submit()); - - // If Consolidation cannot make any progress, throw. The first buffer will - // always contain fixed size data, whether it is tile offsets for var size - // attribute/dimension or the actual fixed size data so we can use its size - // to know if any cells were written or not. - if (cw.sizes().at(0) == 0) { - throw FragmentConsolidatorException( - "Consolidation read 0 cells, no progress can be made"); + // Once the read is complete, drain the queue and exit the reader. + // Note: drain() shuts down the queue without removing elements. + // The write fiber will be notified and write the remaining chunks. + if (query_r->status() != QueryStatus::INCOMPLETE) { + buffer_queue.drain(); + reading = false; + break; + } + } catch (...) { + // Enqueue caught-exceptions to be handled by the writer. + buffer_queue.push(std::current_exception()); + allocated_buffer_size++; // increase buffer size to maintain queue + // logic + reading = false; + break; + } + allocated_buffer_size += buffer_size; + + io_tp.wait_until( + [&]() { return allocated_buffer_size < config_.total_budget_; }); + } + return Status::Ok(); + }); + + // Writer + while (true) { + // Allow ProducerConsumerQueue to wait for an element to be enqueued. + auto buffer_queue_element = buffer_queue.pop_back(); + if (!buffer_queue_element.has_value()) { + // Stop writing once the queue is empty + break; } - // Set explicitly the write query buffers, as the sizes may have - // been altered by the read query. - set_query_buffers(query_w, cw); + auto& buffer = buffer_queue_element.value(); + // Rethrow read-enqueued exceptions. + if (std::holds_alternative(buffer)) { + std::rethrow_exception(std::get(buffer)); + } // WRITE - throw_if_not_ok(query_w->submit()); - } while (query_r->status() == QueryStatus::INCOMPLETE); + auto& writebuf = std::get<0>(buffer); + try { + // Explicitly set the write query buffers, as the sizes may have + // been altered by the read query. + set_query_buffers(query_w, *writebuf.get()); + throw_if_not_ok(query_w->submit()); + allocated_buffer_size -= buffer_size; + } catch (...) { + reading = false; // Stop the reader. + throw_if_not_ok(read_task.wait()); + throw; + } + } + + // Wait for reader to finish + throw_if_not_ok(read_task.wait()); + + // Finalize write query + throw_if_not_ok(query_w->finalize()); } Status FragmentConsolidator::create_queries( diff --git a/tiledb/sm/consolidator/fragment_consolidator.h b/tiledb/sm/consolidator/fragment_consolidator.h index 3fcae353d7e..1d8abf6484d 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.h +++ b/tiledb/sm/consolidator/fragment_consolidator.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2022-2024 TileDB, Inc. + * @copyright Copyright (c) 2022-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -293,6 +293,7 @@ class FragmentConsolidator : public Consolidator { * @param new_fragment_uri The URI of the fragment created after * consolidating the `to_consolidate` fragments. * @param cw A workspace containing buffers for the queries + * * @return Status */ Status consolidate_internal( @@ -300,20 +301,24 @@ class FragmentConsolidator : public Consolidator { shared_ptr array_for_writes, const std::vector& to_consolidate, const NDRange& union_non_empty_domains, - URI* new_fragment_uri, - FragmentConsolidationWorkspace& cw); + URI* new_fragment_uri); /** - * Copies the array by reading from the fragments to be consolidated - * with `query_r` and writing to the new fragment with `query_w`. + * Copies the array by concurrently reading from the fragments to be + * consolidated with `query_r` and writing to the new fragment with `query_w`. * It also appropriately sets the query buffers. * * @param query_r The read query. * @param query_w The write query. - * @param cw A workspace containing buffers for the queries + * @param reader_array_schema_latest The reader's latest array schema. + * @param avg_var_cell_sizes A map of the reader's computed average cell size + * for var size attrs / dims. */ void copy_array( - Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw); + Query* query_r, + Query* query_w, + const ArraySchema& reader_array_schema_latest, + std::unordered_map avg_var_cell_sizes); /** * Creates the queries needed for consolidation. It also retrieves diff --git a/tiledb/sm/filesystem/mem_filesystem.cc b/tiledb/sm/filesystem/mem_filesystem.cc index 56398160179..546d2a17542 100644 --- a/tiledb/sm/filesystem/mem_filesystem.cc +++ b/tiledb/sm/filesystem/mem_filesystem.cc @@ -185,7 +185,8 @@ class MemFilesystem::File : public MemFilesystem::FSNode { if (offset + nbytes > size_) return LOG_STATUS(Status_MemFSError(fmt::format( - "Cannot read from file; Read exceeds file size: offset {} nbytes {} " + "Cannot read from file; Read exceeds file size: offset {} nbytes " + "{} " "size_ {}", offset, nbytes, @@ -559,7 +560,8 @@ MemFilesystem::FSNode* MemFilesystem::create_dir_internal( cur->children_[token] = tdb_unique_ptr(tdb_new(Directory)); } else if (!cur->is_dir()) { throw MemFSException(std::string( - "Cannot create directory, a file with that name exists already: " + + "Cannot create directory, a file with that name exists " + "already: " + path)); } diff --git a/tiledb/sm/filesystem/posix.cc b/tiledb/sm/filesystem/posix.cc index c29aa26d86b..6d2d349f9f0 100644 --- a/tiledb/sm/filesystem/posix.cc +++ b/tiledb/sm/filesystem/posix.cc @@ -245,7 +245,8 @@ uint64_t Posix::read( uint64_t file_size = this->file_size(URI(path)); if (offset + nbytes > file_size) { throw IOError(fmt::format( - "Cannot read from file; Read exceeds file size: offset {}, nbytes {}, " + "Cannot read from file; Read exceeds file size: offset {}, nbytes " + "{}, " "file_size {}, URI {}", offset, nbytes, diff --git a/tiledb/sm/filesystem/win.cc b/tiledb/sm/filesystem/win.cc index c0e8a51f6f9..67c2caf598c 100644 --- a/tiledb/sm/filesystem/win.cc +++ b/tiledb/sm/filesystem/win.cc @@ -483,7 +483,8 @@ uint64_t Win::read( } throw WindowsException(fmt::format( - "Cannot read from file '{}'; File read error '{}' offset {} nbytes " + "Cannot read from file '{}'; File read error '{}' offset {} " + "nbytes " "{}", path, err_msg,