Skip to content

Commit fe8d29c

Browse files
committed
Update usages.
1 parent b518fcd commit fe8d29c

35 files changed

+1059
-1428
lines changed

tiledb/sm/array/array.cc

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -679,16 +679,14 @@ void Array::delete_fragments(
679679

680680
// Delete fragments and commits
681681
auto vfs = &(resources.vfs());
682-
throw_if_not_ok(parallel_for(
683-
&resources.compute_tp(), 0, fragment_uris.size(), [&](size_t i) {
684-
throw_if_not_ok(vfs->remove_dir(fragment_uris[i].uri_));
685-
bool is_file = false;
686-
throw_if_not_ok(vfs->is_file(commit_uris_to_delete[i], &is_file));
687-
if (is_file) {
688-
throw_if_not_ok(vfs->remove_file(commit_uris_to_delete[i]));
689-
}
690-
return Status::Ok();
691-
}));
682+
parallel_for(&resources.compute_tp(), 0, fragment_uris.size(), [&](size_t i) {
683+
throw_if_not_ok(vfs->remove_dir(fragment_uris[i].uri_));
684+
bool is_file = false;
685+
throw_if_not_ok(vfs->is_file(commit_uris_to_delete[i], &is_file));
686+
if (is_file) {
687+
throw_if_not_ok(vfs->remove_file(commit_uris_to_delete[i]));
688+
}
689+
});
692690
}
693691

694692
void Array::delete_fragments(
@@ -1711,7 +1709,7 @@ std::unordered_map<std::string, uint64_t> Array::get_average_var_cell_sizes()
17111709

17121710
// Load all metadata for tile var sizes among fragments.
17131711
for (const auto& var_name : var_names) {
1714-
throw_if_not_ok(parallel_for(
1712+
parallel_for(
17151713
&resources_.compute_tp(),
17161714
0,
17171715
fragment_metadata.size(),
@@ -1720,17 +1718,16 @@ std::unordered_map<std::string, uint64_t> Array::get_average_var_cell_sizes()
17201718
// evolution that do not exists in this fragment.
17211719
const auto& schema = fragment_metadata[f]->array_schema();
17221720
if (!schema->is_field(var_name)) {
1723-
return Status::Ok();
1721+
return;
17241722
}
17251723

17261724
fragment_metadata[f]->loaded_metadata()->load_tile_var_sizes(
17271725
*encryption_key(), var_name);
1728-
return Status::Ok();
1729-
}));
1726+
});
17301727
}
17311728

17321729
// Now compute for each var size names, the average cell size.
1733-
throw_if_not_ok(parallel_for(
1730+
parallel_for(
17341731
&resources_.compute_tp(), 0, var_names.size(), [&](const uint64_t n) {
17351732
uint64_t total_size = 0;
17361733
uint64_t cell_num = 0;
@@ -1756,9 +1753,7 @@ std::unordered_map<std::string, uint64_t> Array::get_average_var_cell_sizes()
17561753

17571754
uint64_t average_cell_size = total_size / cell_num;
17581755
ret[var_name] = std::max<uint64_t>(average_cell_size, 1);
1759-
1760-
return Status::Ok();
1761-
}));
1756+
});
17621757

17631758
return ret;
17641759
}
@@ -1988,15 +1983,12 @@ void Array::do_load_metadata() {
19881983

19891984
auto metadata_num = array_metadata_to_load.size();
19901985
std::vector<shared_ptr<Tile>> metadata_tiles(metadata_num);
1991-
throw_if_not_ok(
1992-
parallel_for(&resources_.compute_tp(), 0, metadata_num, [&](size_t m) {
1993-
const auto& uri = array_metadata_to_load[m].uri_;
1994-
1995-
metadata_tiles[m] = GenericTileIO::load(
1996-
resources_, uri, 0, *encryption_key(), memory_tracker_);
1986+
parallel_for(&resources_.compute_tp(), 0, metadata_num, [&](size_t m) {
1987+
const auto& uri = array_metadata_to_load[m].uri_;
19971988

1998-
return Status::Ok();
1999-
}));
1989+
metadata_tiles[m] = GenericTileIO::load(
1990+
resources_, uri, 0, *encryption_key(), memory_tracker_);
1991+
});
20001992

20011993
// Compute array metadata size for the statistics
20021994
uint64_t meta_size = 0;

tiledb/sm/array/array_directory.cc

Lines changed: 66 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ ArrayDirectory::load_all_array_schemas(
168168
auto schema_num = schema_uris.size();
169169
schema_vector.resize(schema_num);
170170

171-
auto status = parallel_for(
171+
parallel_for(
172172
&resources_.get().compute_tp(), 0, schema_num, [&](size_t schema_ith) {
173173
auto& schema_uri = schema_uris[schema_ith];
174174
try {
@@ -182,10 +182,7 @@ ArrayDirectory::load_all_array_schemas(
182182
// when Status gets removed from this module.
183183
throw ArrayDirectoryException(e.what());
184184
}
185-
186-
return Status::Ok();
187185
});
188-
throw_if_not_ok(status);
189186

190187
std::unordered_map<std::string, shared_ptr<ArraySchema>> array_schemas;
191188
for (const auto& schema : schema_vector) {
@@ -209,11 +206,10 @@ ArrayDirectory::load_enumerations_from_paths(
209206

210207
std::vector<shared_ptr<const Enumeration>> ret(enumeration_paths.size());
211208
auto& tp = resources_.get().io_tp();
212-
throw_if_not_ok(parallel_for(&tp, 0, enumeration_paths.size(), [&](size_t i) {
209+
parallel_for(&tp, 0, enumeration_paths.size(), [&](size_t i) {
213210
ret[i] =
214211
load_enumeration(enumeration_paths[i], encryption_key, memory_tracker);
215-
return Status::Ok();
216-
}));
212+
});
217213
return ret;
218214
}
219215

@@ -317,17 +313,15 @@ void ArrayDirectory::delete_fragments_list(
317313
}
318314

319315
// Delete fragments and commits
320-
throw_if_not_ok(parallel_for(
321-
&resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) {
322-
auto& vfs = resources_.get().vfs();
323-
throw_if_not_ok(vfs.remove_dir(uris[i]));
324-
bool is_file = false;
325-
throw_if_not_ok(vfs.is_file(commit_uris_to_delete[i], &is_file));
326-
if (is_file) {
327-
throw_if_not_ok(vfs.remove_file(commit_uris_to_delete[i]));
328-
}
329-
return Status::Ok();
330-
}));
316+
parallel_for(&resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) {
317+
auto& vfs = resources_.get().vfs();
318+
throw_if_not_ok(vfs.remove_dir(uris[i]));
319+
bool is_file = false;
320+
throw_if_not_ok(vfs.is_file(commit_uris_to_delete[i], &is_file));
321+
if (is_file) {
322+
throw_if_not_ok(vfs.remove_file(commit_uris_to_delete[i]));
323+
}
324+
});
331325
}
332326

333327
Status ArrayDirectory::load() {
@@ -342,16 +336,12 @@ Status ArrayDirectory::load() {
342336
// Some processing is also done here for things that don't depend on others.
343337
if (mode_ != ArrayDirectoryMode::SCHEMA_ONLY) {
344338
// List (in parallel) the root directory URIs
345-
tasks.emplace_back(resources_.get().compute_tp().execute([&]() {
346-
root_dir_uris = list_root_dir_uris();
347-
return Status::Ok();
348-
}));
339+
tasks.emplace_back(resources_.get().compute_tp().execute(
340+
[&]() { root_dir_uris = list_root_dir_uris(); }));
349341

350342
// List (in parallel) the commits directory URIs
351-
tasks.emplace_back(resources_.get().compute_tp().execute([&]() {
352-
commits_dir_uris = list_commits_dir_uris();
353-
return Status::Ok();
354-
}));
343+
tasks.emplace_back(resources_.get().compute_tp().execute(
344+
[&]() { commits_dir_uris = list_commits_dir_uris(); }));
355345

356346
// For commits mode, no need to load fragment/array metadata as they
357347
// are not used for commits consolidation/vacuuming.
@@ -360,29 +350,24 @@ Status ArrayDirectory::load() {
360350
tasks.emplace_back(resources_.get().compute_tp().execute([&]() {
361351
fragment_meta_uris_v12_or_higher =
362352
list_fragment_metadata_dir_uris_v12_or_higher();
363-
return Status::Ok();
364353
}));
365354

366355
// Load (in parallel) the array metadata URIs
367-
tasks.emplace_back(resources_.get().compute_tp().execute([&]() {
368-
load_array_meta_uris();
369-
return Status::Ok();
370-
}));
356+
tasks.emplace_back(resources_.get().compute_tp().execute(
357+
[&]() { load_array_meta_uris(); }));
371358
}
372359
}
373360

374361
// No need to load array schemas for commits mode as they are not used for
375362
// commits consolidation/vacuuming.
376363
if (mode_ != ArrayDirectoryMode::COMMITS) {
377364
// Load (in parallel) the array schema URIs
378-
tasks.emplace_back(resources_.get().compute_tp().execute([&]() {
379-
load_array_schema_uris();
380-
return Status::Ok();
381-
}));
365+
tasks.emplace_back(resources_.get().compute_tp().execute(
366+
[&]() { load_array_schema_uris(); }));
382367
}
383368

384369
// Wait for all tasks to complete
385-
RETURN_NOT_OK(resources_.get().compute_tp().wait_all(tasks));
370+
resources_.get().compute_tp().wait_all(tasks);
386371

387372
if (mode_ != ArrayDirectoryMode::COMMITS) {
388373
// Add old array schema, if required.
@@ -935,16 +920,14 @@ ArrayDirectory::compute_fragment_uris_v1_v11(
935920
// Get only the committed fragment uris
936921
std::vector<uint8_t> is_fragment(array_dir_uris.size(), 0);
937922
auto& tp = resources_.get().compute_tp();
938-
auto status = parallel_for(&tp, 0, array_dir_uris.size(), [&](size_t i) {
923+
parallel_for(&tp, 0, array_dir_uris.size(), [&](size_t i) {
939924
if (stdx::string::starts_with(array_dir_uris[i].last_path_part(), "."))
940-
return Status::Ok();
925+
return;
941926
int32_t flag;
942927
throw_if_not_ok(this->is_fragment(
943928
array_dir_uris[i], ok_uris, consolidated_commit_uris_set_, &flag));
944929
is_fragment[i] = (uint8_t)flag;
945-
return Status::Ok();
946930
});
947-
RETURN_NOT_OK_TUPLE(status, nullopt);
948931

949932
for (size_t i = 0; i < array_dir_uris.size(); ++i) {
950933
if (is_fragment[i]) {
@@ -1018,32 +1001,29 @@ ArrayDirectory::compute_uris_to_vacuum(
10181001
std::vector<uint8_t> vac_file_bitmap(uris.size());
10191002
std::vector<uint8_t> overlapping_vac_file_bitmap(uris.size());
10201003
std::vector<uint8_t> non_vac_uri_bitmap(uris.size());
1021-
throw_if_not_ok(parallel_for(
1022-
&resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) {
1023-
auto& uri = uris[i];
1024-
1025-
// Get the start and end timestamp for this fragment
1026-
FragmentID fragment_id{uri};
1027-
auto fragment_timestamp_range{fragment_id.timestamp_range()};
1028-
if (is_vacuum_file(uri)) {
1029-
vac_file_bitmap[i] = 1;
1030-
if (timestamps_overlap(
1031-
fragment_timestamp_range,
1032-
!full_overlap_only &&
1033-
consolidation_with_timestamps_supported(uri))) {
1034-
overlapping_vac_file_bitmap[i] = 1;
1035-
}
1036-
} else {
1037-
if (!timestamps_overlap(
1038-
fragment_timestamp_range,
1039-
!full_overlap_only &&
1040-
consolidation_with_timestamps_supported(uri))) {
1041-
non_vac_uri_bitmap[i] = 1;
1042-
}
1043-
}
1004+
parallel_for(&resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) {
1005+
auto& uri = uris[i];
10441006

1045-
return Status::Ok();
1046-
}));
1007+
// Get the start and end timestamp for this fragment
1008+
FragmentID fragment_id{uri};
1009+
auto fragment_timestamp_range{fragment_id.timestamp_range()};
1010+
if (is_vacuum_file(uri)) {
1011+
vac_file_bitmap[i] = 1;
1012+
if (timestamps_overlap(
1013+
fragment_timestamp_range,
1014+
!full_overlap_only &&
1015+
consolidation_with_timestamps_supported(uri))) {
1016+
overlapping_vac_file_bitmap[i] = 1;
1017+
}
1018+
} else {
1019+
if (!timestamps_overlap(
1020+
fragment_timestamp_range,
1021+
!full_overlap_only &&
1022+
consolidation_with_timestamps_supported(uri))) {
1023+
non_vac_uri_bitmap[i] = 1;
1024+
}
1025+
}
1026+
});
10471027

10481028
auto num_vac_files =
10491029
std::accumulate(vac_file_bitmap.begin(), vac_file_bitmap.end(), 0);
@@ -1076,7 +1056,7 @@ ArrayDirectory::compute_uris_to_vacuum(
10761056
std::vector<int32_t> to_vacuum_vec(uris.size(), 0);
10771057
std::vector<int32_t> to_vacuum_vac_files_vec(vac_files.size(), 0);
10781058
auto& tp = resources_.get().compute_tp();
1079-
auto status = parallel_for(&tp, 0, vac_files.size(), [&](size_t i) {
1059+
parallel_for(&tp, 0, vac_files.size(), [&](size_t i) {
10801060
uint64_t size = 0;
10811061
auto& vfs = resources_.get().vfs();
10821062
throw_if_not_ok(vfs.file_size(vac_files[i], &size));
@@ -1099,10 +1079,7 @@ ArrayDirectory::compute_uris_to_vacuum(
10991079
}
11001080

11011081
to_vacuum_vac_files_vec[i] = vacuum_vac_file;
1102-
1103-
return Status::Ok();
11041082
});
1105-
RETURN_NOT_OK_TUPLE(status, nullopt, nullopt);
11061083

11071084
// Compute the fragment URIs to vacuum
11081085
std::vector<URI> uris_to_vacuum;
@@ -1144,30 +1121,28 @@ ArrayDirectory::compute_filtered_uris(
11441121
std::vector<uint8_t> overlaps_bitmap(uris.size());
11451122
std::vector<std::pair<uint64_t, uint64_t>> fragment_timestamp_ranges(
11461123
uris.size());
1147-
throw_if_not_ok(parallel_for(
1148-
&resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) {
1149-
auto& uri = uris[i];
1150-
std::string short_uri = uri.to_string().substr(base_uri_size);
1151-
if (to_ignore_set.count(short_uri.c_str()) != 0) {
1152-
return Status::Ok();
1153-
}
1124+
parallel_for(&resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) {
1125+
auto& uri = uris[i];
1126+
std::string short_uri = uri.to_string().substr(base_uri_size);
1127+
if (to_ignore_set.count(short_uri.c_str()) != 0) {
1128+
return;
1129+
}
11541130

1155-
// Also ignore any vac uris
1156-
if (is_vacuum_file(uri)) {
1157-
return Status::Ok();
1158-
}
1131+
// Also ignore any vac uris
1132+
if (is_vacuum_file(uri)) {
1133+
return;
1134+
}
11591135

1160-
// Get the start and end timestamp for this fragment
1161-
FragmentID fragment_id{uri};
1162-
fragment_timestamp_ranges[i] = fragment_id.timestamp_range();
1163-
if (timestamps_overlap(
1164-
fragment_timestamp_ranges[i],
1165-
!full_overlap_only &&
1166-
consolidation_with_timestamps_supported(uri))) {
1167-
overlaps_bitmap[i] = 1;
1168-
}
1169-
return Status::Ok();
1170-
}));
1136+
// Get the start and end timestamp for this fragment
1137+
FragmentID fragment_id{uri};
1138+
fragment_timestamp_ranges[i] = fragment_id.timestamp_range();
1139+
if (timestamps_overlap(
1140+
fragment_timestamp_ranges[i],
1141+
!full_overlap_only &&
1142+
consolidation_with_timestamps_supported(uri))) {
1143+
overlaps_bitmap[i] = 1;
1144+
}
1145+
});
11711146

11721147
auto count =
11731148
std::accumulate(overlaps_bitmap.begin(), overlaps_bitmap.end(), 0);

0 commit comments

Comments
 (0)