Skip to content

Commit 798a939

Browse files
authored
Move consolidation calls out of StorageManager. (#4668)
This moves consolidation out of StorageManager using static methods in `tiledb::sm::Consolidator`. The subclasses that derive from Consolidator will still need access to the StorageManager for opening the arrays and groups in the `consolidate` and `vacuum` overrides. --- TYPE: NO_HISTORY DESC: Move consolidation calls out of StorageManager.
1 parent 5804d00 commit 798a939

13 files changed

+312
-361
lines changed

tiledb/sm/c_api/tiledb.cc

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
#include "tiledb/sm/c_api/api_argument_validator.h"
5959
#include "tiledb/sm/config/config.h"
6060
#include "tiledb/sm/config/config_iter.h"
61+
#include "tiledb/sm/consolidator/consolidator.h"
6162
#include "tiledb/sm/cpp_api/core_interface.h"
6263
#include "tiledb/sm/enums/array_type.h"
6364
#include "tiledb/sm/enums/encryption_type.h"
@@ -2702,13 +2703,13 @@ int32_t tiledb_array_create_with_key(
27022703
int32_t tiledb_array_consolidate(
27032704
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
27042705
api::ensure_config_is_valid_if_present(config);
2705-
throw_if_not_ok(ctx->storage_manager()->array_consolidate(
2706+
tiledb::sm::Consolidator::array_consolidate(
27062707
array_uri,
27072708
tiledb::sm::EncryptionType::NO_ENCRYPTION,
27082709
nullptr,
27092710
0,
2710-
(config == nullptr) ? ctx->storage_manager()->config() :
2711-
config->config()));
2711+
(config == nullptr) ? ctx->storage_manager()->config() : config->config(),
2712+
ctx->storage_manager());
27122713
return TILEDB_OK;
27132714
}
27142715

@@ -2721,13 +2722,13 @@ int32_t tiledb_array_consolidate_with_key(
27212722
tiledb_config_t* config) {
27222723
// Sanity checks
27232724

2724-
throw_if_not_ok(ctx->storage_manager()->array_consolidate(
2725+
tiledb::sm::Consolidator::array_consolidate(
27252726
array_uri,
27262727
static_cast<tiledb::sm::EncryptionType>(encryption_type),
27272728
encryption_key,
27282729
key_length,
2729-
(config == nullptr) ? ctx->storage_manager()->config() :
2730-
config->config()));
2730+
(config == nullptr) ? ctx->storage_manager()->config() : config->config(),
2731+
ctx->storage_manager());
27312732

27322733
return TILEDB_OK;
27332734
}
@@ -2747,24 +2748,24 @@ int32_t tiledb_array_consolidate_fragments(
27472748
uris.emplace_back(fragment_uris[i]);
27482749
}
27492750

2750-
throw_if_not_ok(ctx->storage_manager()->fragments_consolidate(
2751+
tiledb::sm::Consolidator::fragments_consolidate(
27512752
array_uri,
27522753
tiledb::sm::EncryptionType::NO_ENCRYPTION,
27532754
nullptr,
27542755
0,
27552756
uris,
2756-
(config == nullptr) ? ctx->storage_manager()->config() :
2757-
config->config()));
2757+
(config == nullptr) ? ctx->storage_manager()->config() : config->config(),
2758+
ctx->storage_manager());
27582759

27592760
return TILEDB_OK;
27602761
}
27612762

27622763
int32_t tiledb_array_vacuum(
27632764
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
2764-
ctx->storage_manager()->array_vacuum(
2765+
tiledb::sm::Consolidator::array_vacuum(
27652766
array_uri,
2766-
(config == nullptr) ? ctx->storage_manager()->config() :
2767-
config->config());
2767+
(config == nullptr) ? ctx->storage_manager()->config() : config->config(),
2768+
ctx->storage_manager());
27682769

27692770
return TILEDB_OK;
27702771
}

tiledb/sm/consolidator/array_meta_consolidator.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,14 @@ class ArrayMetaConsolidator : public Consolidator {
8585
const char* array_name,
8686
EncryptionType encryption_type,
8787
const void* encryption_key,
88-
uint32_t key_length);
88+
uint32_t key_length) override;
8989

9090
/**
9191
* Performs the vacuuming operation.
9292
*
9393
* @param array_name URI of array to consolidate.
9494
*/
95-
void vacuum(const char* array_name);
95+
void vacuum(const char* array_name) override;
9696

9797
private:
9898
/* ********************************* */

tiledb/sm/consolidator/commits_consolidator.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ Status CommitsConsolidator::consolidate(
9595

9696
// Get the file name.
9797
auto& to_consolidate = array_dir.commit_uris_to_consolidate();
98-
storage_manager_->write_consolidated_commits_file(
99-
write_version, array_dir, to_consolidate);
98+
Consolidator::write_consolidated_commits_file(
99+
write_version, array_dir, to_consolidate, storage_manager_);
100100

101101
return Status::Ok();
102102
}

tiledb/sm/consolidator/commits_consolidator.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,14 @@ class CommitsConsolidator : public Consolidator {
8383
const char* array_name,
8484
EncryptionType encryption_type,
8585
const void* encryption_key,
86-
uint32_t key_length);
86+
uint32_t key_length) override;
8787

8888
/**
8989
* Performs the vacuuming operation.
9090
*
9191
* @param array_name URI of array to consolidate.
9292
*/
93-
void vacuum(const char* array_name);
93+
void vacuum(const char* array_name) override;
9494
};
9595

9696
} // namespace tiledb::sm

tiledb/sm/consolidator/consolidator.cc

Lines changed: 201 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@
3232

3333
#include "tiledb/sm/consolidator/consolidator.h"
3434
#include "tiledb/common/logger.h"
35+
#include "tiledb/common/stdx_string.h"
3536
#include "tiledb/sm/consolidator/array_meta_consolidator.h"
3637
#include "tiledb/sm/consolidator/commits_consolidator.h"
3738
#include "tiledb/sm/consolidator/fragment_consolidator.h"
3839
#include "tiledb/sm/consolidator/fragment_meta_consolidator.h"
3940
#include "tiledb/sm/consolidator/group_meta_consolidator.h"
41+
#include "tiledb/sm/enums/encryption_type.h"
42+
#include "tiledb/sm/rest/rest_client.h"
4043
#include "tiledb/sm/storage_manager/storage_manager.h"
44+
#include "tiledb/storage_format/uri/generate_uri.h"
4145

4246
using namespace tiledb::common;
4347

@@ -121,12 +125,207 @@ Status Consolidator::consolidate(
121125
}
122126

123127
void Consolidator::vacuum([[maybe_unused]] const char* array_name) {
124-
throw Status_ConsolidatorError("Cannot vacuum; Invalid object");
128+
throw ConsolidatorException("Cannot vacuum; Invalid object");
129+
}
130+
131+
void Consolidator::array_consolidate(
132+
const char* array_name,
133+
EncryptionType encryption_type,
134+
const void* encryption_key,
135+
uint32_t key_length,
136+
const Config& config,
137+
StorageManager* storage_manager) {
138+
// Check array URI
139+
URI array_uri(array_name);
140+
if (array_uri.is_invalid()) {
141+
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
142+
}
143+
144+
// Check if array exists
145+
ObjectType obj_type;
146+
throw_if_not_ok(storage_manager->object_type(array_uri, &obj_type));
147+
148+
if (obj_type != ObjectType::ARRAY) {
149+
throw ConsolidatorException(
150+
"Cannot consolidate array; Array does not exist");
151+
}
152+
153+
if (array_uri.is_tiledb()) {
154+
throw_if_not_ok(storage_manager->rest_client()->post_consolidation_to_rest(
155+
array_uri, config));
156+
} else {
157+
// Get encryption key from config
158+
std::string encryption_key_from_cfg;
159+
if (!encryption_key) {
160+
bool found = false;
161+
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
162+
assert(found);
163+
}
164+
165+
if (!encryption_key_from_cfg.empty()) {
166+
encryption_key = encryption_key_from_cfg.c_str();
167+
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
168+
std::string encryption_type_from_cfg;
169+
bool found = false;
170+
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
171+
assert(found);
172+
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
173+
throw_if_not_ok(st);
174+
encryption_type = et.value();
175+
176+
if (!EncryptionKey::is_valid_key_length(
177+
encryption_type,
178+
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
179+
encryption_key = nullptr;
180+
key_length = 0;
181+
}
182+
}
183+
184+
// Consolidate
185+
auto mode = Consolidator::mode_from_config(config);
186+
auto consolidator = Consolidator::create(mode, config, storage_manager);
187+
throw_if_not_ok(consolidator->consolidate(
188+
array_name, encryption_type, encryption_key, key_length));
189+
}
190+
}
191+
192+
void Consolidator::fragments_consolidate(
193+
const char* array_name,
194+
EncryptionType encryption_type,
195+
const void* encryption_key,
196+
uint32_t key_length,
197+
const std::vector<std::string> fragment_uris,
198+
const Config& config,
199+
StorageManager* storage_manager) {
200+
// Check array URI
201+
URI array_uri(array_name);
202+
if (array_uri.is_invalid()) {
203+
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
204+
}
205+
206+
// Check if array exists
207+
ObjectType obj_type;
208+
throw_if_not_ok(storage_manager->object_type(array_uri, &obj_type));
209+
210+
if (obj_type != ObjectType::ARRAY) {
211+
throw ConsolidatorException(
212+
"Cannot consolidate array; Array does not exist");
213+
}
214+
215+
// Get encryption key from config
216+
std::string encryption_key_from_cfg;
217+
if (!encryption_key) {
218+
bool found = false;
219+
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
220+
assert(found);
221+
}
222+
223+
if (!encryption_key_from_cfg.empty()) {
224+
encryption_key = encryption_key_from_cfg.c_str();
225+
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
226+
std::string encryption_type_from_cfg;
227+
bool found = false;
228+
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
229+
assert(found);
230+
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
231+
throw_if_not_ok(st);
232+
encryption_type = et.value();
233+
234+
if (!EncryptionKey::is_valid_key_length(
235+
encryption_type,
236+
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
237+
encryption_key = nullptr;
238+
key_length = 0;
239+
}
240+
}
241+
242+
// Consolidate
243+
auto consolidator = Consolidator::create(
244+
ConsolidationMode::FRAGMENT, config, storage_manager);
245+
auto fragment_consolidator =
246+
dynamic_cast<FragmentConsolidator*>(consolidator.get());
247+
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
248+
array_name, encryption_type, encryption_key, key_length, fragment_uris));
249+
}
250+
251+
void Consolidator::write_consolidated_commits_file(
252+
format_version_t write_version,
253+
ArrayDirectory array_dir,
254+
const std::vector<URI>& commit_uris,
255+
StorageManager* storage_manager) {
256+
// Compute the file name.
257+
auto name = storage_format::generate_consolidated_fragment_name(
258+
commit_uris.front(), commit_uris.back(), write_version);
259+
260+
// Compute size of consolidated file. Save the sizes of the files to re-use
261+
// below.
262+
storage_size_t total_size = 0;
263+
const auto base_uri_size = array_dir.uri().to_string().size();
264+
std::vector<storage_size_t> file_sizes(commit_uris.size());
265+
for (uint64_t i = 0; i < commit_uris.size(); i++) {
266+
const auto& uri = commit_uris[i];
267+
total_size += uri.to_string().size() - base_uri_size + 1;
268+
269+
// If the file is a delete, add the file size to the count and the size of
270+
// the size variable.
271+
if (stdx::string::ends_with(
272+
uri.to_string(), constants::delete_file_suffix)) {
273+
throw_if_not_ok(storage_manager->vfs()->file_size(uri, &file_sizes[i]));
274+
total_size += file_sizes[i];
275+
total_size += sizeof(storage_size_t);
276+
}
277+
}
278+
279+
// Write consolidated file, URIs are relative to the array URI.
280+
std::vector<uint8_t> data(total_size);
281+
storage_size_t file_index = 0;
282+
for (uint64_t i = 0; i < commit_uris.size(); i++) {
283+
// Add the uri.
284+
const auto& uri = commit_uris[i];
285+
std::string relative_uri = uri.to_string().substr(base_uri_size) + "\n";
286+
memcpy(&data[file_index], relative_uri.data(), relative_uri.size());
287+
file_index += relative_uri.size();
288+
289+
// For deletes, read the delete condition to the output file.
290+
if (stdx::string::ends_with(
291+
uri.to_string(), constants::delete_file_suffix)) {
292+
memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t));
293+
file_index += sizeof(storage_size_t);
294+
throw_if_not_ok(storage_manager->vfs()->read(
295+
uri, 0, &data[file_index], file_sizes[i]));
296+
file_index += file_sizes[i];
297+
}
298+
}
299+
300+
// Write the file to storage.
301+
URI consolidated_commits_uri =
302+
array_dir.get_commits_dir(write_version)
303+
.join_path(name + constants::con_commits_file_suffix);
304+
throw_if_not_ok(storage_manager->vfs()->write(
305+
consolidated_commits_uri, data.data(), data.size()));
306+
throw_if_not_ok(storage_manager->vfs()->close_file(consolidated_commits_uri));
307+
}
308+
309+
void Consolidator::array_vacuum(
310+
const char* array_name,
311+
const Config& config,
312+
StorageManager* storage_manager) {
313+
URI array_uri(array_name);
314+
if (array_uri.is_tiledb()) {
315+
throw_if_not_ok(
316+
storage_manager->rest_client()->post_vacuum_to_rest(array_uri, config));
317+
return;
318+
}
319+
320+
auto mode = Consolidator::mode_from_config(config, true);
321+
auto consolidator = Consolidator::create(mode, config, storage_manager);
322+
consolidator->vacuum(array_name);
125323
}
126324

127325
void Consolidator::check_array_uri(const char* array_name) {
128326
if (URI(array_name).is_tiledb()) {
129-
throw std::logic_error("Consolidation is not supported for remote arrays.");
327+
throw ConsolidatorException(
328+
"Consolidation is not supported for remote arrays.");
130329
}
131330
}
132331

0 commit comments

Comments
 (0)