Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions test/src/unit-capi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2017-2021 TileDB Inc.
* @copyright Copyright (c) 2017-2025 TileDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -7596,16 +7596,32 @@ TEST_CASE_METHOD(

write_sparse_string_full();
write_sparse_string_unordered();
consolidate_sparse_string(1, true);

uint64_t string_size = 1;
std::string errmsg = "";

SECTION("too small") {
string_size = 1;
errmsg =
"FragmentConsolidator: Consolidation read 0 cells, no "
"progress can be made";
}

SECTION("too large") {
string_size = 10737418240 + 2; // over default memory budget of 10GB
errmsg =
"FragmentConsolidator: Consolidation cannot proceed without "
"disrespecting the memory budget.";
}

consolidate_sparse_string(string_size, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this write only the one input cell?
If so then the write queue is always empty for this test.
We should have additional tests which contain huge cells which occur when the write queue has data in it. e.g. the buffer is 4G, the writer is processing 1G, there are two buffers queued at 1G each, and the next cell is 2G.
We must have coverage for waiting for there to be enough space to read more data.


tiledb_error_t* err = NULL;
tiledb_ctx_get_last_error(ctx_, &err);

const char* msg;
tiledb_error_message(err, &msg);
CHECK(
std::string("FragmentConsolidator: Consolidation read 0 cells, no "
"progress can be made") == msg);
CHECK(errmsg == msg);

remove_sparse_string_array();
}
Expand Down
208 changes: 146 additions & 62 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <iostream>
#include <numeric>
#include <sstream>
#include <variant>

using namespace tiledb::common;

Expand Down Expand Up @@ -248,8 +249,6 @@ Status FragmentConsolidator::consolidate(
return st;
}

FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);

uint32_t step = 0;
std::vector<TimestampedURI> to_consolidate;
do {
Expand Down Expand Up @@ -282,8 +281,7 @@ Status FragmentConsolidator::consolidate(
array_for_writes,
to_consolidate,
union_non_empty_domains,
&new_fragment_uri,
cw);
&new_fragment_uri);
if (!st.ok()) {
throw_if_not_ok(array_for_reads->close());
throw_if_not_ok(array_for_writes->close());
Expand Down Expand Up @@ -463,17 +461,14 @@ Status FragmentConsolidator::consolidate_fragments(
}
}

FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);

// Consolidate the selected fragments
URI new_fragment_uri;
st = consolidate_internal(
array_for_reads,
array_for_writes,
to_consolidate,
union_non_empty_domains,
&new_fragment_uri,
cw);
&new_fragment_uri);
if (!st.ok()) {
throw_if_not_ok(array_for_reads->close());
throw_if_not_ok(array_for_writes->close());
Expand Down Expand Up @@ -581,8 +576,7 @@ Status FragmentConsolidator::consolidate_internal(
shared_ptr<Array> array_for_writes,
const std::vector<TimestampedURI>& to_consolidate,
const NDRange& union_non_empty_domains,
URI* new_fragment_uri,
FragmentConsolidationWorkspace& cw) {
URI* new_fragment_uri) {
auto timer_se = stats_->start_timer("consolidate_internal");

array_for_reads->load_fragments(to_consolidate);
Expand Down Expand Up @@ -626,19 +620,13 @@ Status FragmentConsolidator::consolidate_internal(
uint64_t total_weights =
config_.buffers_weight_ + config_.reader_weight_ + config_.writer_weight_;
uint64_t single_unit_budget = config_.total_budget_ / total_weights;
uint64_t buffers_budget = config_.buffers_weight_ * single_unit_budget;
uint64_t reader_budget = config_.reader_weight_ * single_unit_budget;
uint64_t writer_budget = config_.writer_weight_ * single_unit_budget;

// Prepare buffers
auto average_var_cell_sizes = array_for_reads->get_average_var_cell_sizes();
cw.resize_buffers(
stats_, config_, array_schema, average_var_cell_sizes, buffers_budget);

// Create queries
tdb_unique_ptr<Query> query_r = nullptr;
tdb_unique_ptr<Query> query_w = nullptr;
throw_if_not_ok(create_queries(
RETURN_NOT_OK(create_queries(
array_for_reads,
array_for_writes,
union_non_empty_domains,
Expand All @@ -654,66 +642,162 @@ Status FragmentConsolidator::consolidate_internal(
vac_uri =
array_for_reads->array_directory().get_vacuum_uri(*new_fragment_uri);
} catch (std::exception& e) {
FragmentConsolidatorException(
throw FragmentConsolidatorException(
"Internal consolidation failed with exception" + std::string(e.what()));
}

// Read from one array and write to the other
copy_array(query_r.get(), query_w.get(), cw);

// Finalize write query
auto st = query_w->finalize();
if (!st.ok()) {
if (resources_.vfs().is_dir(*new_fragment_uri))
resources_.vfs().remove_dir(*new_fragment_uri);
return st;
}

// Write vacuum file
st = write_vacuum_file(
array_for_reads->array_schema_latest().write_version(),
array_for_reads->array_uri(),
vac_uri,
to_consolidate);
if (!st.ok()) {
if (resources_.vfs().is_dir(*new_fragment_uri))
// Consolidate fragments
try {
// Read from one array and write to the other
copy_array(
query_r.get(),
query_w.get(),
array_schema,
array_for_reads->get_average_var_cell_sizes());
// Write vacuum file
throw_if_not_ok(write_vacuum_file(
array_for_reads->array_schema_latest().write_version(),
array_for_reads->array_uri(),
vac_uri,
to_consolidate));
} catch (...) {
if (resources_.vfs().is_dir(*new_fragment_uri)) {
resources_.vfs().remove_dir(*new_fragment_uri);
return st;
}
std::rethrow_exception(std::current_exception());
}

return st;
return Status::Ok();
}

void FragmentConsolidator::copy_array(
Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw) {
auto timer_se = stats_->start_timer("consolidate_copy_array");
Query* query_r,
Query* query_w,
const ArraySchema& reader_array_schema_latest,
std::unordered_map<std::string, uint64_t> average_var_cell_sizes) {
// The size of the buffers.
// 10MB by default, unless total_budget_ is smaller, or buffer_size_ is set.
uint64_t buffer_size =
config_.buffer_size_ != 0 ?
config_.buffer_size_ :
std::min((uint64_t)10485760, config_.total_budget_);
// Initial, "ungrown", value of `buffer_size`.
const uint64_t initial_buffer_size = buffer_size;
if (buffer_size > config_.total_budget_) {
throw FragmentConsolidatorException(
"Consolidation cannot proceed without disrespecting the memory "
"budget.");
}

// Set the read query buffers outside the repeated submissions.
// The Reader will reset the query buffer sizes to the original
// sizes, not the potentially smaller sizes of the results after
// the query submission.
set_query_buffers(query_r, cw);
// Deque which stores the buffers passed between the reader and writer.
// Cannot exceed `config_.total_budget_`.
ProducerConsumerQueue<std::variant<
tdb_shared_ptr<FragmentConsolidationWorkspace>,
std::exception_ptr>>
buffer_queue;

// Atomic counter of the queue buffers allocated by the reader.
// May not exceed `config_.total_budget_`.
std::atomic<uint64_t> allocated_buffer_size = 0;

// Flag indicating an ongoing read. The reader will stop once set to `false`.
std::atomic<bool> reading = true;

// Reader
auto& io_tp = resources_.io_tp();
ThreadPool::Task read_task = io_tp.execute([&] {
while (reading) {
tdb_shared_ptr<FragmentConsolidationWorkspace> cw =
tdb::make_shared<FragmentConsolidationWorkspace>(
HERE(), consolidator_memory_tracker_);
// READ
try {
// Set the read query buffers.
cw->resize_buffers(
stats_,
config_,
reader_array_schema_latest,
average_var_cell_sizes,
buffer_size);
set_query_buffers(query_r, *cw.get());
throw_if_not_ok(query_r->submit());

// Only continue if Consolidation can make progress. The first buffer
// will always contain fixed size data, whether it is tile offsets for
// var size attribute/dimension or the actual fixed size data so we can
// use its size to know if any cells were written or not.
if (cw->sizes().at(0) == 0) {
if (buffer_size == initial_buffer_size) {
// If the first read failed, throw.
throw FragmentConsolidatorException(
"Consolidation read 0 cells, no progress can be made");
}
// If it's not the first read, grow the buffer and try again.
buffer_size += std::min(
config_.total_budget_ - allocated_buffer_size, (2 * buffer_size));
} else {
buffer_queue.push(cw);
}

do {
// READ
throw_if_not_ok(query_r->submit());

// If Consolidation cannot make any progress, throw. The first buffer will
// always contain fixed size data, whether it is tile offsets for var size
// attribute/dimension or the actual fixed size data so we can use its size
// to know if any cells were written or not.
if (cw.sizes().at(0) == 0) {
throw FragmentConsolidatorException(
"Consolidation read 0 cells, no progress can be made");
// Once the read is complete, drain the queue and exit the reader.
// Note: drain() shuts down the queue without removing elements.
// The write fiber will be notified and write the remaining chunks.
if (query_r->status() != QueryStatus::INCOMPLETE) {
buffer_queue.drain();
reading = false;
break;
}
} catch (...) {
// Enqueue caught-exceptions to be handled by the writer.
buffer_queue.push(std::current_exception());
allocated_buffer_size++; // increase buffer size to maintain queue
// logic
reading = false;
break;
}
allocated_buffer_size += buffer_size;

io_tp.wait_until(
[&]() { return allocated_buffer_size < config_.total_budget_; });
}
return Status::Ok();
});

// Writer
while (true) {
// Allow ProducerConsumerQueue to wait for an element to be enqueued.
auto buffer_queue_element = buffer_queue.pop_back();
if (!buffer_queue_element.has_value()) {
// Stop writing once the queue is empty
break;
}

// Set explicitly the write query buffers, as the sizes may have
// been altered by the read query.
set_query_buffers(query_w, cw);
auto& buffer = buffer_queue_element.value();
// Rethrow read-enqueued exceptions.
if (std::holds_alternative<std::exception_ptr>(buffer)) {
std::rethrow_exception(std::get<std::exception_ptr>(buffer));
}

// WRITE
throw_if_not_ok(query_w->submit());
} while (query_r->status() == QueryStatus::INCOMPLETE);
auto& writebuf = std::get<0>(buffer);
try {
// Explicitly set the write query buffers, as the sizes may have
// been altered by the read query.
set_query_buffers(query_w, *writebuf.get());
throw_if_not_ok(query_w->submit());
allocated_buffer_size -= buffer_size;
} catch (...) {
reading = false; // Stop the reader.
throw_if_not_ok(read_task.wait());
throw;
}
}

// Wait for reader to finish
throw_if_not_ok(read_task.wait());

// Finalize write query
throw_if_not_ok(query_w->finalize());
}

Status FragmentConsolidator::create_queries(
Expand Down
19 changes: 12 additions & 7 deletions tiledb/sm/consolidator/fragment_consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
* The MIT License
*
* @copyright Copyright (c) 2022-2024 TileDB, Inc.
* @copyright Copyright (c) 2022-2025 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -293,27 +293,32 @@ class FragmentConsolidator : public Consolidator {
* @param new_fragment_uri The URI of the fragment created after
* consolidating the `to_consolidate` fragments.
* @param cw A workspace containing buffers for the queries
*
* @return Status
*/
Status consolidate_internal(
shared_ptr<Array> array_for_reads,
shared_ptr<Array> array_for_writes,
const std::vector<TimestampedURI>& to_consolidate,
const NDRange& union_non_empty_domains,
URI* new_fragment_uri,
FragmentConsolidationWorkspace& cw);
URI* new_fragment_uri);

/**
* Copies the array by reading from the fragments to be consolidated
* with `query_r` and writing to the new fragment with `query_w`.
* Copies the array by concurrently reading from the fragments to be
* consolidated with `query_r` and writing to the new fragment with `query_w`.
* It also appropriately sets the query buffers.
*
* @param query_r The read query.
* @param query_w The write query.
* @param cw A workspace containing buffers for the queries
* @param reader_array_schema_latest The reader's latest array schema.
* @param avg_var_cell_sizes A map of the reader's computed average cell size
* for var size attrs / dims.
*/
void copy_array(
Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw);
Query* query_r,
Query* query_w,
const ArraySchema& reader_array_schema_latest,
std::unordered_map<std::string, uint64_t> avg_var_cell_sizes);

/**
* Creates the queries needed for consolidation. It also retrieves
Expand Down
6 changes: 4 additions & 2 deletions tiledb/sm/filesystem/mem_filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ class MemFilesystem::File : public MemFilesystem::FSNode {

if (offset + nbytes > size_)
return LOG_STATUS(Status_MemFSError(fmt::format(
"Cannot read from file; Read exceeds file size: offset {} nbytes {} "
"Cannot read from file; Read exceeds file size: offset {} nbytes "
"{} "
"size_ {}",
offset,
nbytes,
Expand Down Expand Up @@ -559,7 +560,8 @@ MemFilesystem::FSNode* MemFilesystem::create_dir_internal(
cur->children_[token] = tdb_unique_ptr<FSNode>(tdb_new(Directory));
} else if (!cur->is_dir()) {
throw MemFSException(std::string(
"Cannot create directory, a file with that name exists already: " +
"Cannot create directory, a file with that name exists "
"already: " +
path));
}

Expand Down
Loading
Loading