Skip to content

Commit 8b40db2

Browse files
committed
Support remote arrays for fragment_list consolidation
1 parent 69d2a62 commit 8b40db2

File tree

10 files changed

+254
-127
lines changed

10 files changed

+254
-127
lines changed

test/src/unit-capi-consolidation.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7169,7 +7169,7 @@ TEST_CASE_METHOD(
71697169
TEST_CASE_METHOD(
71707170
ConsolidationFx,
71717171
"C API: Test consolidation, dense split fragments",
7172-
"[capi][consolidation][dense][split-fragments][non-rest]") {
7172+
"[capi][consolidation][dense][split-fragments][rest]") {
71737173
remove_dense_array();
71747174
create_dense_array();
71757175
write_dense_subarray(1, 2, 1, 2);
@@ -7253,7 +7253,7 @@ TEST_CASE_METHOD(
72537253
TEST_CASE_METHOD(
72547254
ConsolidationFx,
72557255
"C API: Test consolidation, sparse split fragments",
7256-
"[capi][consolidation][sparse][split-fragments][non-rest]") {
7256+
"[capi][consolidation][sparse][split-fragments][rest]") {
72577257
remove_sparse_array();
72587258
create_sparse_array();
72597259
write_sparse_row(0);

test/support/src/serialization_wrappers.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
*/
3333

3434
#include "test/support/src/helpers.h"
35+
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
36+
#include "tiledb/api/c_api/context/context_api_internal.h"
3537
#include "tiledb/sm/c_api/tiledb.h"
3638
#include "tiledb/sm/c_api/tiledb_serialization.h"
3739
#include "tiledb/sm/c_api/tiledb_struct_def.h"
40+
#include "tiledb/sm/serialization/consolidation.h"
3841
#include "tiledb/sm/serialization/query.h"
3942

4043
#ifdef TILEDB_SERIALIZATION
@@ -218,3 +221,28 @@ void tiledb_subarray_serialize(
218221
*subarray = deserialized_subarray;
219222
#endif
220223
}
224+
225+
void tiledb_array_consolidation_request_wrapper(
226+
tiledb_ctx_t* ctx,
227+
tiledb_serialization_type_t serialize_type,
228+
const std::vector<std::string>* fragment_uris_in,
229+
std::vector<std::string>* fragment_uris_out) {
230+
// Serialize and Deserialize
231+
auto buffer = tiledb_buffer_handle_t::make_handle();
232+
serialization::array_consolidation_request_serialize(
233+
ctx->config(),
234+
fragment_uris_in,
235+
static_cast<tiledb::sm::SerializationType>(serialize_type),
236+
&(buffer->buffer()));
237+
238+
auto [config, fragment_uris_deser] =
239+
serialization::array_consolidation_request_deserialize(
240+
static_cast<tiledb::sm::SerializationType>(serialize_type),
241+
buffer->buffer());
242+
243+
tiledb_buffer_handle_t::break_handle(buffer);
244+
245+
if (fragment_uris_deser.has_value()) {
246+
*fragment_uris_out = fragment_uris_deser.value();
247+
}
248+
}

test/support/src/serialization_wrappers.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#define TILEDB_TEST_SERIALIZATION_WRAPPERS_H
3636

3737
#include <string>
38+
#include <vector>
3839

3940
#include "tiledb/sm/c_api/tiledb.h"
4041
#include "tiledb/sm/c_api/tiledb_serialization.h"
@@ -141,4 +142,11 @@ int tiledb_fragment_info_serialize(
141142
*/
142143
void tiledb_subarray_serialize(
143144
tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray);
145+
146+
void tiledb_array_consolidation_request_wrapper(
147+
tiledb_ctx_t* ctx,
148+
tiledb_serialization_type_t serialize_type,
149+
const std::vector<std::string>* fragment_uris_in,
150+
std::vector<std::string>* fragment_uris_out);
151+
144152
#endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H

tiledb/sm/c_api/tiledb.cc

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,14 +2318,32 @@ int32_t tiledb_array_create(
23182318

23192319
int32_t tiledb_array_consolidate(
23202320
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
2321+
// Validate input arguments
2322+
api::ensure_context_is_valid(ctx);
23212323
api::ensure_config_is_valid_if_present(config);
2324+
2325+
auto uri = tiledb::sm::URI(array_uri);
2326+
if (uri.is_invalid()) {
2327+
throw api::CAPIStatusException(
2328+
"Failed to consolidate fragments; Invalid input array uri");
2329+
}
2330+
2331+
auto input_config = (config == nullptr) ? ctx->config() : config->config();
2332+
if (uri.is_tiledb() &&
2333+
tiledb::sm::Consolidator::mode_from_config(input_config) ==
2334+
tiledb::sm::ConsolidationMode::FRAGMENT) {
2335+
throw api::CAPIStatusException(
2336+
"Please use tiledb_array_consolidate_fragments API for consolidating "
2337+
"fragments on remote arrays.");
2338+
}
2339+
23222340
tiledb::sm::Consolidator::array_consolidate(
23232341
ctx->resources(),
23242342
array_uri,
23252343
tiledb::sm::EncryptionType::NO_ENCRYPTION,
23262344
nullptr,
23272345
0,
2328-
(config == nullptr) ? ctx->config() : config->config(),
2346+
input_config,
23292347
ctx->storage_manager());
23302348
return TILEDB_OK;
23312349
}
@@ -2336,7 +2354,33 @@ int32_t tiledb_array_consolidate_fragments(
23362354
const char** fragment_uris,
23372355
const uint64_t num_fragments,
23382356
tiledb_config_t* config) {
2339-
// Sanity checks
2357+
// Validate input arguments
2358+
api::ensure_context_is_valid(ctx);
2359+
api::ensure_config_is_valid_if_present(config);
2360+
2361+
if (fragment_uris == nullptr) {
2362+
throw api::CAPIStatusException(
2363+
"Failed to consolidate fragments; Invalid input fragment list");
2364+
}
2365+
2366+
auto uri = tiledb::sm::URI(array_uri);
2367+
if (uri.is_invalid()) {
2368+
throw api::CAPIStatusException(
2369+
"Failed to consolidate fragments; Invalid input array uri");
2370+
}
2371+
2372+
if (num_fragments < 1) {
2373+
throw api::CAPIStatusException(
2374+
"Failed to consolidate fragments; Invalid input number of fragments");
2375+
}
2376+
2377+
for (size_t i = 0; i < num_fragments; i++) {
2378+
if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) {
2379+
throw api::CAPIStatusException(
2380+
"Failed to consolidate fragments; Invalid uri(s) in input fragment "
2381+
"list");
2382+
}
2383+
}
23402384

23412385
// Convert the list of fragments to a vector
23422386
std::vector<std::string> uris;

tiledb/sm/consolidator/consolidator.cc

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ void Consolidator::array_consolidate(
151151
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
152152
}
153153

154-
// Check if array exists
155-
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
156-
throw ConsolidatorException(
157-
"Cannot consolidate array; Array does not exist");
158-
}
159-
160154
if (array_uri.is_tiledb()) {
161155
throw_if_not_ok(
162156
resources.rest_client()->post_consolidation_to_rest(array_uri, config));
163157
} else {
158+
// Check if array exists
159+
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
160+
throw ConsolidatorException(
161+
"Cannot consolidate array; Array does not exist");
162+
}
163+
164164
// Get encryption key from config
165165
std::string encryption_key_from_cfg;
166166
if (!encryption_key) {
@@ -212,46 +212,54 @@ void Consolidator::fragments_consolidate(
212212
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
213213
}
214214

215-
// Check if array exists
216-
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
217-
throw ConsolidatorException(
218-
"Cannot consolidate array; Array does not exist");
219-
}
215+
if (array_uri.is_tiledb()) {
216+
throw_if_not_ok(
217+
storage_manager->resources().rest_client()->post_consolidation_to_rest(
218+
array_uri, config, &fragment_uris));
219+
} else {
220+
// Check if array exists
221+
if (object_type(resources, array_uri) != ObjectType::ARRAY) {
222+
throw ConsolidatorException(
223+
"Cannot consolidate array; Array does not exist");
224+
}
220225

221-
// Get encryption key from config
222-
std::string encryption_key_from_cfg;
223-
if (!encryption_key) {
224-
bool found = false;
225-
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
226-
assert(found);
227-
}
226+
// Get encryption key from config
227+
std::string encryption_key_from_cfg;
228+
if (!encryption_key) {
229+
bool found = false;
230+
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
231+
assert(found);
232+
}
228233

229-
if (!encryption_key_from_cfg.empty()) {
230-
encryption_key = encryption_key_from_cfg.c_str();
231-
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
232-
std::string encryption_type_from_cfg;
233-
bool found = false;
234-
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
235-
assert(found);
236-
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
237-
throw_if_not_ok(st);
238-
encryption_type = et.value();
239-
240-
if (!EncryptionKey::is_valid_key_length(
241-
encryption_type,
242-
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
243-
encryption_key = nullptr;
244-
key_length = 0;
234+
if (!encryption_key_from_cfg.empty()) {
235+
encryption_key = encryption_key_from_cfg.c_str();
236+
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
237+
std::string encryption_type_from_cfg;
238+
bool found = false;
239+
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
240+
assert(found);
241+
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
242+
throw_if_not_ok(st);
243+
encryption_type = et.value();
244+
245+
if (!EncryptionKey::is_valid_key_length(
246+
encryption_type,
247+
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
248+
encryption_key = nullptr;
249+
key_length = 0;
250+
}
245251
}
246-
}
247252

248-
// Consolidate
249-
auto consolidator = Consolidator::create(
250-
resources, ConsolidationMode::FRAGMENT, config, storage_manager);
251-
auto fragment_consolidator =
252-
dynamic_cast<FragmentConsolidator*>(consolidator.get());
253-
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
254-
array_name, encryption_type, encryption_key, key_length, fragment_uris));
253+
// Consolidate
254+
auto fragment_consolidator =
255+
make_shared<FragmentConsolidator>(HERE(), config, storage_manager);
256+
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
257+
array_name,
258+
encryption_type,
259+
encryption_key,
260+
key_length,
261+
fragment_uris));
262+
}
255263
}
256264

257265
void Consolidator::write_consolidated_commits_file(

tiledb/sm/rest/rest_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ class RestClient {
463463
}
464464

465465
/// Operation disabled in base class.
466-
inline virtual Status post_consolidation_to_rest(const URI&, const Config&) {
466+
inline virtual Status post_consolidation_to_rest(const URI&, const Config&, const std::vector<std::string>*) {
467467
throw RestClientDisabledException();
468468
}
469469

tiledb/sm/rest/rest_client_remote.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,11 +1524,14 @@ Status RestClientRemote::ensure_json_null_delimited_string(Buffer* buffer) {
15241524
return Status::Ok();
15251525
}
15261526

1527-
Status RestClientRemote::post_consolidation_to_rest(
1528-
const URI& uri, const Config& config) {
1527+
Status RestClient::post_consolidation_to_rest(
1528+
const URI& uri,
1529+
const Config& config,
1530+
const std::vector<std::string>* fragment_uris) {
15291531
Buffer buff;
1530-
RETURN_NOT_OK(serialization::array_consolidation_request_serialize(
1531-
config, serialization_type_, &buff));
1532+
serialization::array_consolidation_request_serialize(
1533+
config, fragment_uris, serialization_type_, &buff);
1534+
15321535
// Wrap in a list
15331536
BufferList serialized;
15341537
RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));

tiledb/sm/rest/rest_client_remote.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,10 +425,14 @@ class RestClientRemote : public RestClient {
425425
*
426426
* @param uri Array URI
427427
* @param config config
428+
* @param fragment_uris The uris of the fragments to be consolidated if this
429+
* is a request for fragment list consolidation
428430
* @return
429431
*/
430432
Status post_consolidation_to_rest(
431-
const URI& uri, const Config& config) override;
433+
const URI& uri,
434+
const Config& config,
435+
const std::vector<std::string>* fragment_uris = nullptr) override;
432436

433437
/**
434438
* Post array vacuum request to the REST server.

0 commit comments

Comments
 (0)