Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tiledb/api/c_api/group/group_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ capi_return_t tiledb_group_consolidate_metadata(
ensure_group_uri_argument_is_valid(group_uri);

auto cfg = (config == nullptr) ? ctx->config() : config->config();
throw_if_not_ok(cfg.set("sm.consolidation.mode", "group_meta"));
tiledb::sm::Group::consolidate_metadata(ctx->resources(), group_uri, cfg);

return TILEDB_OK;
Expand All @@ -554,6 +555,7 @@ capi_return_t tiledb_group_vacuum_metadata(
ensure_group_uri_argument_is_valid(group_uri);

auto cfg = (config == nullptr) ? ctx->config() : config->config();
throw_if_not_ok(cfg.set("sm.vacuum.mode", "group_meta"));
sm::Group::vacuum_metadata(ctx->resources(), group_uri, cfg);

return TILEDB_OK;
Expand Down
1 change: 0 additions & 1 deletion tiledb/sm/consolidator/array_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Status ArrayMetaConsolidator::consolidate(
const void* encryption_key,
uint32_t key_length) {
auto timer_se = stats_->start_timer("consolidate_array_meta");
check_array_uri(array_name);

// Open array for reading
auto array_uri = URI(array_name);
Expand Down
2 changes: 0 additions & 2 deletions tiledb/sm/consolidator/commits_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ Status CommitsConsolidator::consolidate(
uint32_t key_length) {
auto timer_se = stats_->start_timer("consolidate_commits");

check_array_uri(array_name);

// Open array for writing
auto array_uri = URI(array_name);
Array array_for_writes(resources_, array_uri);
Expand Down
25 changes: 9 additions & 16 deletions tiledb/sm/consolidator/consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ void Consolidator::array_consolidate(
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
}

// Check if array exists
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

if (array_uri.is_tiledb()) {
throw_if_not_ok(
resources.rest_client()->post_consolidation_to_rest(array_uri, config));
throw_if_not_ok(resources.rest_client()->post_array_consolidation_to_rest(
array_uri, config));
} else {
// Check if array exists
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
Expand Down Expand Up @@ -315,7 +315,7 @@ void Consolidator::array_vacuum(
URI array_uri(array_name);
if (array_uri.is_tiledb()) {
throw_if_not_ok(
resources.rest_client()->post_vacuum_to_rest(array_uri, config));
resources.rest_client()->post_array_vacuum_to_rest(array_uri, config));
return;
}

Expand All @@ -325,11 +325,4 @@ void Consolidator::array_vacuum(
consolidator->vacuum(array_name);
}

void Consolidator::check_array_uri(const char* array_name) {
if (URI(array_name).is_tiledb()) {
throw ConsolidatorException(
"Consolidation is not supported for remote arrays.");
}
}

} // namespace tiledb::sm
7 changes: 0 additions & 7 deletions tiledb/sm/consolidator/consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,6 @@ class Consolidator {
explicit Consolidator(
ContextResources& resources, StorageManager* storage_manager);

/**
* Checks if the array is remote.
*
* @param array_name Name of the array to check.
*/
void check_array_uri(const char* array_name);

/* ********************************* */
/* PROTECTED ATTRIBUTES */
/* ********************************* */
Expand Down
2 changes: 0 additions & 2 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ Status FragmentConsolidator::consolidate(
uint32_t key_length) {
auto timer_se = stats_->start_timer("consolidate_frags");

check_array_uri(array_name);

// Open array for reading
auto array_for_reads{make_shared<Array>(HERE(), resources_, URI(array_name))};
throw_if_not_ok(array_for_reads->open_without_fragments(
Expand Down
2 changes: 0 additions & 2 deletions tiledb/sm/consolidator/fragment_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ Status FragmentMetaConsolidator::consolidate(
uint32_t key_length) {
auto timer_se = stats_->start_timer("consolidate_frag_meta");

check_array_uri(array_name);

// Open array for reading
Array array(resources_, URI(array_name));
throw_if_not_ok(
Expand Down
1 change: 0 additions & 1 deletion tiledb/sm/consolidator/group_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ GroupMetaConsolidator::GroupMetaConsolidator(
Status GroupMetaConsolidator::consolidate(
const char* group_name, EncryptionType, const void*, uint32_t) {
auto timer_se = stats_->start_timer("consolidate_group_meta");
check_array_uri(group_name);

// Open group for reading
auto group_uri = URI(group_name);
Expand Down
36 changes: 24 additions & 12 deletions tiledb/sm/group/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,19 +570,25 @@ void Group::consolidate_metadata(
if (group_uri.is_invalid()) {
throw GroupException("Cannot consolidate group metadata; Invalid URI");
}
// Check if group exists
if (object_type(resources, group_uri) != ObjectType::GROUP) {
throw GroupException(
"Cannot consolidate group metadata; Group does not exist");
}

// Consolidate
// Encryption credentials are loaded by Group from config
StorageManager sm(resources, resources.logger(), config);
auto consolidator = Consolidator::create(
resources, ConsolidationMode::GROUP_META, config, &sm);
throw_if_not_ok(consolidator->consolidate(
group_name, EncryptionType::NO_ENCRYPTION, nullptr, 0));
if (group_uri.is_tiledb()) {
throw_if_not_ok(resources.rest_client()->post_group_consolidation_to_rest(
group_uri, config));
} else {
// Check if group exists
if (object_type(resources, group_uri) != ObjectType::GROUP) {
throw GroupException(
"Cannot consolidate group metadata; Group does not exist");
}

// Consolidate
// Encryption credentials are loaded by Group from config
StorageManager sm(resources, resources.logger(), config);
auto consolidator = Consolidator::create(
resources, ConsolidationMode::GROUP_META, config, &sm);
throw_if_not_ok(consolidator->consolidate(
group_name, EncryptionType::NO_ENCRYPTION, nullptr, 0));
}
}

void Group::vacuum_metadata(
Expand All @@ -593,6 +599,12 @@ void Group::vacuum_metadata(
throw GroupException("Cannot vacuum group metadata; Invalid URI");
}

if (group_uri.is_tiledb()) {
throw_if_not_ok(
resources.rest_client()->post_group_vacuum_to_rest(group_uri, config));
return;
}

// Check if group exists
if (object_type(resources, group_uri) != ObjectType::GROUP) {
throw GroupException("Cannot vacuum group metadata; Group does not exist");
Expand Down
16 changes: 14 additions & 2 deletions tiledb/sm/rest/rest_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,24 @@ class RestClient {
}

/// Operation disabled in base class.
inline virtual Status post_consolidation_to_rest(const URI&, const Config&) {
inline virtual Status post_array_consolidation_to_rest(
const URI&, const Config&) {
throw RestClientDisabledException();
}

/// Operation disabled in base class.
inline virtual Status post_vacuum_to_rest(const URI&, const Config&) {
inline virtual Status post_group_consolidation_to_rest(
const URI&, const Config&) {
throw RestClientDisabledException();
}

/// Operation disabled in base class.
inline virtual Status post_array_vacuum_to_rest(const URI&, const Config&) {
throw RestClientDisabledException();
}

/// Operation disabled in base class.
inline virtual Status post_group_vacuum_to_rest(const URI&, const Config&) {
throw RestClientDisabledException();
}

Expand Down
56 changes: 54 additions & 2 deletions tiledb/sm/rest/rest_client_remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,7 @@ void RestClientRemote::delete_group_from_rest(const URI& uri, bool recursive) {
stats_, url, serialization_type_, &returned_data, cache_key));
}

Status RestClientRemote::post_consolidation_to_rest(
Status RestClientRemote::post_array_consolidation_to_rest(
const URI& uri, const Config& config) {
BufferList serialized{memory_tracker_};
auto& buff = serialized.emplace_buffer();
Expand All @@ -1586,7 +1586,33 @@ Status RestClientRemote::post_consolidation_to_rest(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

Status RestClientRemote::post_vacuum_to_rest(
Status RestClientRemote::post_group_consolidation_to_rest(
const URI& uri, const Config& config) {
BufferList serialized{memory_tracker_};
auto& buff = serialized.emplace_buffer();
RETURN_NOT_OK(serialization::array_consolidation_request_serialize(
config, serialization_type_, buff));

// Init curl and form the URL
Curl curlc(logger_);
URI::RESTURIComponents rest_uri;
RETURN_NOT_OK(uri.get_rest_components(rest_legacy(), &rest_uri));
const std::string cache_key =
rest_uri.server_namespace + ":" + rest_uri.server_path;
RETURN_NOT_OK(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url =
redirect_uri(cache_key) + "/v2/groups/" +
curlc.url_escape_namespace(rest_uri.server_namespace) + "/" +
curlc.url_escape(rest_uri.server_path) + "/consolidate";

// Get the data
Buffer returned_data;
return curlc.post_data(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

Status RestClientRemote::post_array_vacuum_to_rest(
const URI& uri, const Config& config) {
BufferList serialized{memory_tracker_};
auto& buff = serialized.emplace_buffer();
Expand All @@ -1612,6 +1638,32 @@ Status RestClientRemote::post_vacuum_to_rest(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

Status RestClientRemote::post_group_vacuum_to_rest(
const URI& uri, const Config& config) {
BufferList serialized{memory_tracker_};
auto& buff = serialized.emplace_buffer();
RETURN_NOT_OK(serialization::array_vacuum_request_serialize(
config, serialization_type_, buff));

// Init curl and form the URL
Curl curlc(logger_);
URI::RESTURIComponents rest_uri;
RETURN_NOT_OK(uri.get_rest_components(rest_legacy(), &rest_uri));
const std::string cache_key =
rest_uri.server_namespace + ":" + rest_uri.server_path;
RETURN_NOT_OK(
curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
const std::string url =
redirect_uri(cache_key) + "/v2/groups/" +
curlc.url_escape_namespace(rest_uri.server_namespace) + "/" +
curlc.url_escape(rest_uri.server_path) + "/vacuum";

// Get the data
Buffer returned_data;
return curlc.post_data(
stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
}

std::vector<std::vector<std::string>>
RestClientRemote::post_consolidation_plan_from_rest(
const URI& uri, const Config& config, uint64_t fragment_size) {
Expand Down
25 changes: 23 additions & 2 deletions tiledb/sm/rest/rest_client_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,17 @@ class RestClientRemote : public RestClient {
* @param config config
* @return
*/
Status post_consolidation_to_rest(
Status post_array_consolidation_to_rest(
const URI& uri, const Config& config) override;

/**
* Post group consolidation request to the REST server.
*
* @param uri Group URI
* @param config config
* @return
*/
Status post_group_consolidation_to_rest(
const URI& uri, const Config& config) override;

/**
Expand All @@ -476,7 +486,18 @@ class RestClientRemote : public RestClient {
* @param config config
* @return
*/
Status post_vacuum_to_rest(const URI& uri, const Config& config) override;
Status post_array_vacuum_to_rest(
const URI& uri, const Config& config) override;

/**
* Post group vacuum request to the REST server.
*
* @param uri group URI
* @param config config
* @return
*/
Status post_group_vacuum_to_rest(
const URI& uri, const Config& config) override;

/**
* Get consolidation plan from the REST server via POST request.
Expand Down
Loading