Skip to content

Commit 124ce98

Browse files
committed
Support remote arrays for fragment_list consolidation
1 parent 1326ed4 commit 124ce98

File tree

9 files changed

+336
-134
lines changed

9 files changed

+336
-134
lines changed

test/src/unit-capi-consolidation.cc

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include <test/support/tdb_catch.h>
3434
#include "test/support/src/helpers.h"
35+
#include "test/support/src/serialization_wrappers.h"
3536
#include "test/support/src/vfs_helpers.h"
3637
#include "tiledb/common/stdx_string.h"
3738
#include "tiledb/platform/platform.h"
@@ -72,6 +73,8 @@ struct ConsolidationFx {
7273
tiledb_encryption_type_t encryption_type_ = TILEDB_NO_ENCRYPTION;
7374
const char* encryption_key_ = nullptr;
7475

76+
bool serialize_ = false;
77+
7578
// Constructors/destructors
7679
ConsolidationFx();
7780

@@ -5081,6 +5084,10 @@ TEST_CASE_METHOD(
50815084
ConsolidationFx,
50825085
"C API: Test advanced consolidation #1",
50835086
"[capi][consolidation][adv][adv-1][non-rest]") {
5087+
#ifdef TILEDB_SERIALIZATION
5088+
serialize_ = true;
5089+
#endif
5090+
50845091
remove_dense_vector();
50855092
create_dense_vector();
50865093
write_dense_vector_4_fragments();
@@ -5113,6 +5120,13 @@ TEST_CASE_METHOD(
51135120
REQUIRE(rc == TILEDB_OK);
51145121
REQUIRE(error == nullptr);
51155122

5123+
if (serialize_) {
5124+
std::vector<std::string> frag_uris_deserialized;
5125+
tiledb_array_consolidation_request_wrapper(
5126+
ctx_, tiledb_serialization_type_t(0), nullptr, &frag_uris_deserialized);
5127+
REQUIRE(frag_uris_deserialized.empty());
5128+
}
5129+
51165130
// Consolidate
51175131
rc = tiledb_array_consolidate(ctx_, dense_vector_uri_.c_str(), config);
51185132
CHECK(rc == TILEDB_OK);
@@ -7158,7 +7172,11 @@ TEST_CASE_METHOD(
71587172
TEST_CASE_METHOD(
71597173
ConsolidationFx,
71607174
"C API: Test consolidation, dense split fragments",
7161-
"[capi][consolidation][dense][split-fragments][non-rest]") {
7175+
"[capi][consolidation][dense][split-fragments][rest]") {
7176+
#ifdef TILEDB_SERIALIZATION
7177+
serialize_ = true;
7178+
#endif
7179+
71627180
remove_dense_array();
71637181
create_dense_array();
71647182
write_dense_subarray(1, 2, 1, 2);
@@ -7198,6 +7216,23 @@ TEST_CASE_METHOD(
71987216

71997217
// Consolidate
72007218
const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1};
7219+
7220+
if (serialize_) {
7221+
std::vector<std::string> frag_uris;
7222+
frag_uris.reserve(2);
7223+
for (uint64_t i = 0; i < 2; i++) {
7224+
frag_uris.emplace_back(uris[i]);
7225+
}
7226+
7227+
std::vector<std::string> frag_uris_deserialized;
7228+
tiledb_array_consolidation_request_wrapper(
7229+
ctx_,
7230+
tiledb_serialization_type_t(0),
7231+
&frag_uris,
7232+
&frag_uris_deserialized);
7233+
REQUIRE(frag_uris == frag_uris_deserialized);
7234+
}
7235+
72017236
rc = tiledb_array_consolidate_fragments(
72027237
ctx_, dense_array_uri_.c_str(), uris, 2, cfg);
72037238
CHECK(rc == TILEDB_OK);
@@ -7234,7 +7269,11 @@ TEST_CASE_METHOD(
72347269
TEST_CASE_METHOD(
72357270
ConsolidationFx,
72367271
"C API: Test consolidation, sparse split fragments",
7237-
"[capi][consolidation][sparse][split-fragments][non-rest]") {
7272+
"[capi][consolidation][sparse][split-fragments][rest]") {
7273+
#ifdef TILEDB_SERIALIZATION
7274+
serialize_ = true;
7275+
#endif
7276+
72387277
remove_sparse_array();
72397278
create_sparse_array();
72407279
write_sparse_row(0);
@@ -7274,6 +7313,23 @@ TEST_CASE_METHOD(
72747313

72757314
// Consolidate
72767315
const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1};
7316+
7317+
if (serialize_) {
7318+
std::vector<std::string> frag_uris;
7319+
frag_uris.reserve(2);
7320+
for (uint64_t i = 0; i < 2; i++) {
7321+
frag_uris.emplace_back(uris[i]);
7322+
}
7323+
7324+
std::vector<std::string> frag_uris_deserialized;
7325+
tiledb_array_consolidation_request_wrapper(
7326+
ctx_,
7327+
tiledb_serialization_type_t(0),
7328+
&frag_uris,
7329+
&frag_uris_deserialized);
7330+
REQUIRE(frag_uris == frag_uris_deserialized);
7331+
}
7332+
72777333
rc = tiledb_array_consolidate_fragments(
72787334
ctx_, sparse_array_uri_.c_str(), uris, 2, cfg);
72797335
CHECK(rc == TILEDB_OK);

test/support/src/serialization_wrappers.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
*/
3333

3434
#include "test/support/src/helpers.h"
35+
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
36+
#include "tiledb/api/c_api/context/context_api_internal.h"
3537
#include "tiledb/sm/c_api/tiledb.h"
3638
#include "tiledb/sm/c_api/tiledb_serialization.h"
3739
#include "tiledb/sm/c_api/tiledb_struct_def.h"
40+
#include "tiledb/sm/serialization/consolidation.h"
3841
#include "tiledb/sm/serialization/query.h"
3942

4043
#ifdef TILEDB_SERIALIZATION
@@ -218,3 +221,28 @@ void tiledb_subarray_serialize(
218221
*subarray = deserialized_subarray;
219222
#endif
220223
}
224+
225+
void tiledb_array_consolidation_request_wrapper(
226+
tiledb_ctx_t* ctx,
227+
tiledb_serialization_type_t serialize_type,
228+
const std::vector<std::string>* fragment_uris_in,
229+
std::vector<std::string>* fragment_uris_out) {
230+
// Serialize and Deserialize
231+
auto buffer = tiledb_buffer_handle_t::make_handle();
232+
serialization::array_consolidation_request_serialize(
233+
ctx->config(),
234+
fragment_uris_in,
235+
static_cast<tiledb::sm::SerializationType>(serialize_type),
236+
&(buffer->buffer()));
237+
238+
auto [config, fragment_uris_deser] =
239+
serialization::array_consolidation_request_deserialize(
240+
static_cast<tiledb::sm::SerializationType>(serialize_type),
241+
buffer->buffer());
242+
243+
tiledb_buffer_handle_t::break_handle(buffer);
244+
245+
if (fragment_uris_deser.has_value()) {
246+
*fragment_uris_out = fragment_uris_deser.value();
247+
}
248+
}

test/support/src/serialization_wrappers.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,11 @@ int tiledb_fragment_info_serialize(
141141
*/
142142
void tiledb_subarray_serialize(
143143
tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray);
144+
145+
void tiledb_array_consolidation_request_wrapper(
146+
tiledb_ctx_t* ctx,
147+
tiledb_serialization_type_t serialize_type,
148+
const std::vector<std::string>* fragment_uris_in,
149+
std::vector<std::string>* fragment_uris_out);
150+
144151
#endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H

tiledb/sm/c_api/tiledb.cc

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2664,13 +2664,31 @@ int32_t tiledb_array_create_with_key(
26642664

26652665
int32_t tiledb_array_consolidate(
26662666
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
2667+
// Validate input arguments
2668+
api::ensure_context_is_valid(ctx);
26672669
api::ensure_config_is_valid_if_present(config);
2670+
2671+
auto uri = tiledb::sm::URI(array_uri);
2672+
if (uri.is_invalid()) {
2673+
throw api::CAPIStatusException(
2674+
"Failed to consolidate fragments; Invalid input array uri");
2675+
}
2676+
2677+
auto input_config = (config == nullptr) ? ctx->config() : config->config();
2678+
if (uri.is_tiledb() &&
2679+
tiledb::sm::Consolidator::mode_from_config(input_config) ==
2680+
tiledb::sm::ConsolidationMode::FRAGMENT) {
2681+
throw api::CAPIStatusException(
2682+
"Please use tiledb_array_consolidate_fragments API for consolidating "
2683+
"fragments on remote arrays.");
2684+
}
2685+
26682686
tiledb::sm::Consolidator::array_consolidate(
26692687
array_uri,
26702688
tiledb::sm::EncryptionType::NO_ENCRYPTION,
26712689
nullptr,
26722690
0,
2673-
(config == nullptr) ? ctx->config() : config->config(),
2691+
input_config,
26742692
ctx->storage_manager());
26752693
return TILEDB_OK;
26762694
}
@@ -2682,7 +2700,15 @@ int32_t tiledb_array_consolidate_with_key(
26822700
const void* encryption_key,
26832701
uint32_t key_length,
26842702
tiledb_config_t* config) {
2685-
// Sanity checks
2703+
// Validate input arguments
2704+
api::ensure_context_is_valid(ctx);
2705+
api::ensure_config_is_valid_if_present(config);
2706+
2707+
auto uri = tiledb::sm::URI(array_uri);
2708+
if (uri.is_invalid()) {
2709+
throw api::CAPIStatusException(
2710+
"Failed to consolidate fragments; Invalid input array uri");
2711+
}
26862712

26872713
tiledb::sm::Consolidator::array_consolidate(
26882714
array_uri,
@@ -2701,7 +2727,33 @@ int32_t tiledb_array_consolidate_fragments(
27012727
const char** fragment_uris,
27022728
const uint64_t num_fragments,
27032729
tiledb_config_t* config) {
2704-
// Sanity checks
2730+
// Validate input arguments
2731+
api::ensure_context_is_valid(ctx);
2732+
api::ensure_config_is_valid_if_present(config);
2733+
2734+
if (fragment_uris == nullptr) {
2735+
throw api::CAPIStatusException(
2736+
"Failed to consolidate fragments; Invalid input fragment list");
2737+
}
2738+
2739+
auto uri = tiledb::sm::URI(array_uri);
2740+
if (uri.is_invalid()) {
2741+
throw api::CAPIStatusException(
2742+
"Failed to consolidate fragments; Invalid input array uri");
2743+
}
2744+
2745+
if (num_fragments < 1) {
2746+
throw api::CAPIStatusException(
2747+
"Failed to consolidate fragments; Invalid input number of fragments");
2748+
}
2749+
2750+
for (size_t i = 0; i < num_fragments; i++) {
2751+
if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) {
2752+
throw api::CAPIStatusException(
2753+
"Failed to consolidate fragments; Invalid uri(s) in input fragment "
2754+
"list");
2755+
}
2756+
}
27052757

27062758
// Convert the list of fragments to a vector
27072759
std::vector<std::string> uris;

tiledb/sm/consolidator/consolidator.cc

Lines changed: 57 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -146,21 +146,21 @@ void Consolidator::array_consolidate(
146146
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
147147
}
148148

149-
// Check if array exists
150-
ObjectType obj_type;
151-
throw_if_not_ok(
152-
object_type(storage_manager->resources(), array_uri, &obj_type));
153-
154-
if (obj_type != ObjectType::ARRAY) {
155-
throw ConsolidatorException(
156-
"Cannot consolidate array; Array does not exist");
157-
}
158-
159149
if (array_uri.is_tiledb()) {
160150
throw_if_not_ok(
161151
storage_manager->resources().rest_client()->post_consolidation_to_rest(
162152
array_uri, config));
163153
} else {
154+
// Check if array exists
155+
ObjectType obj_type;
156+
throw_if_not_ok(
157+
object_type(storage_manager->resources(), array_uri, &obj_type));
158+
159+
if (obj_type != ObjectType::ARRAY) {
160+
throw ConsolidatorException(
161+
"Cannot consolidate array; Array does not exist");
162+
}
163+
164164
// Get encryption key from config
165165
std::string encryption_key_from_cfg;
166166
if (!encryption_key) {
@@ -210,50 +210,58 @@ void Consolidator::fragments_consolidate(
210210
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
211211
}
212212

213-
// Check if array exists
214-
ObjectType obj_type;
215-
throw_if_not_ok(
216-
object_type(storage_manager->resources(), array_uri, &obj_type));
213+
if (array_uri.is_tiledb()) {
214+
throw_if_not_ok(
215+
storage_manager->resources().rest_client()->post_consolidation_to_rest(
216+
array_uri, config, &fragment_uris));
217+
} else {
218+
// Check if array exists
219+
ObjectType obj_type;
220+
throw_if_not_ok(
221+
object_type(storage_manager->resources(), array_uri, &obj_type));
217222

218-
if (obj_type != ObjectType::ARRAY) {
219-
throw ConsolidatorException(
220-
"Cannot consolidate array; Array does not exist");
221-
}
223+
if (obj_type != ObjectType::ARRAY) {
224+
throw ConsolidatorException(
225+
"Cannot consolidate array; Array does not exist");
226+
}
222227

223-
// Get encryption key from config
224-
std::string encryption_key_from_cfg;
225-
if (!encryption_key) {
226-
bool found = false;
227-
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
228-
assert(found);
229-
}
228+
// Get encryption key from config
229+
std::string encryption_key_from_cfg;
230+
if (!encryption_key) {
231+
bool found = false;
232+
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
233+
assert(found);
234+
}
235+
236+
if (!encryption_key_from_cfg.empty()) {
237+
encryption_key = encryption_key_from_cfg.c_str();
238+
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
239+
std::string encryption_type_from_cfg;
240+
bool found = false;
241+
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
242+
assert(found);
243+
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
244+
throw_if_not_ok(st);
245+
encryption_type = et.value();
230246

231-
if (!encryption_key_from_cfg.empty()) {
232-
encryption_key = encryption_key_from_cfg.c_str();
233-
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
234-
std::string encryption_type_from_cfg;
235-
bool found = false;
236-
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
237-
assert(found);
238-
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
239-
throw_if_not_ok(st);
240-
encryption_type = et.value();
241-
242-
if (!EncryptionKey::is_valid_key_length(
243-
encryption_type,
244-
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
245-
encryption_key = nullptr;
246-
key_length = 0;
247+
if (!EncryptionKey::is_valid_key_length(
248+
encryption_type,
249+
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
250+
encryption_key = nullptr;
251+
key_length = 0;
252+
}
247253
}
248-
}
249254

250-
// Consolidate
251-
auto consolidator = Consolidator::create(
252-
ConsolidationMode::FRAGMENT, config, storage_manager);
253-
auto fragment_consolidator =
254-
dynamic_cast<FragmentConsolidator*>(consolidator.get());
255-
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
256-
array_name, encryption_type, encryption_key, key_length, fragment_uris));
255+
// Consolidate
256+
auto fragment_consolidator =
257+
make_shared<FragmentConsolidator>(HERE(), config, storage_manager);
258+
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
259+
array_name,
260+
encryption_type,
261+
encryption_key,
262+
key_length,
263+
fragment_uris));
264+
}
257265
}
258266

259267
void Consolidator::write_consolidated_commits_file(

0 commit comments

Comments
 (0)