Skip to content

Commit 1e5c750

Browse files
support consolidation with max_frag_size in Dense
1 parent 223aad8 commit 1e5c750

File tree

4 files changed

+165
-3
lines changed

4 files changed

+165
-3
lines changed

test/src/unit-cppapi-max-fragment-size.cc

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,3 +503,95 @@ TEST_CASE(
503503

504504
array.close();
505505
}
506+
507+
TEST_CASE(
508+
"Setting max_fragment_size in Dense consolidation",
509+
"[global-order-writer]") {
510+
std::string array_name = "cpp_max_fragment_size_bug";
511+
Context ctx;
512+
513+
auto cleanup = [&]() {
514+
auto obj = Object::object(ctx, array_name);
515+
if (obj.type() == Object::Type::Array) {
516+
Object::remove(ctx, array_name);
517+
}
518+
};
519+
520+
cleanup();
521+
522+
// Remove the array at the end of this test.
523+
ScopedExecutor deferred(cleanup);
524+
525+
// Create an array with exactly 9 tiles and tile extend 1
526+
Domain domain(ctx);
527+
ArraySchema schema(ctx, TILEDB_DENSE);
528+
auto d1 = tiledb::Dimension::create<int32_t>(ctx, "d1", {{0, 2}}, 1);
529+
auto d2 = tiledb::Dimension::create<int32_t>(ctx, "d2", {{0, 2}}, 1);
530+
domain.add_dimension(d1);
531+
domain.add_dimension(d2);
532+
533+
auto a1 = tiledb::Attribute::create<int32_t>(ctx, "a");
534+
schema.add_attribute(a1);
535+
536+
schema.set_order({{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
537+
schema.set_domain(domain);
538+
539+
Array::create(array_name, schema);
540+
541+
// Populate array with data from 1 to 9
542+
int value = 0;
543+
for (int i = 0; i < 3; i++) {
544+
Array array(ctx, array_name, TILEDB_WRITE);
545+
Query query(ctx, array);
546+
query.set_layout(TILEDB_ROW_MAJOR);
547+
tiledb::Subarray sub(ctx, array);
548+
sub.set_subarray({i, i, 0, 2});
549+
query.set_subarray(sub);
550+
std::vector<int32_t> data = {++value, ++value, ++value};
551+
query.set_data_buffer("a", data);
552+
query.submit();
553+
array.close();
554+
}
555+
556+
// Read data to validate write and num of fragments.
557+
CHECK(tiledb::test::num_fragments(array_name) == 3);
558+
Array array(ctx, array_name, TILEDB_READ);
559+
const std::vector<int32_t> subarray = {0, 2, 0, 2};
560+
std::vector<int32_t> a(9);
561+
Query query(ctx, array, TILEDB_READ);
562+
query.set_subarray(subarray)
563+
.set_layout(TILEDB_ROW_MAJOR)
564+
.set_data_buffer("a", a);
565+
query.submit();
566+
array.close();
567+
568+
for (int i = 0; i < 9; i++) {
569+
CHECK(a[i] == i + 1);
570+
}
571+
572+
// Consolidate with a size limitation for the fragment. This will result in
573+
// the creation of two new fragments.
574+
tiledb::Config cfg;
575+
cfg.set("sm.consolidation.max_fragment_size", "150");
576+
ctx = Context(cfg);
577+
Array::consolidate(ctx, array_name);
578+
Array::vacuum(ctx, array_name);
579+
580+
// Check that we now have 2 fragments instead of 3
581+
CHECK(tiledb::test::num_fragments(array_name) == 2);
582+
583+
// Read data to validate correctness
584+
Array array2(ctx, array_name, TILEDB_READ);
585+
const std::vector<int32_t> subarray2 = {0, 2, 0, 2};
586+
std::vector<int32_t> a2(9);
587+
Query query2(ctx, array2, TILEDB_READ);
588+
query2.set_subarray(subarray2)
589+
.set_layout(TILEDB_ROW_MAJOR)
590+
.set_data_buffer("a", a2);
591+
query2.submit();
592+
array2.close();
593+
594+
for (int i = 0; i < 9; i++) {
595+
CHECK(a2[i] == i + 1);
596+
}
597+
}

tiledb/sm/fragment/fragment_metadata.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,8 +695,8 @@ void FragmentMetadata::init_domain(const NDRange& non_empty_domain) {
695695

696696
// Sanity check
697697
assert(!non_empty_domain.empty());
698-
assert(non_empty_domain_.empty());
699-
assert(domain_.empty());
698+
// assert(non_empty_domain_.empty()); todo, this might cause problems
699+
// assert(domain_.empty());
700700

701701
// Set non-empty domain for dense arrays (for sparse it will be calculated
702702
// via the MBRs)

tiledb/sm/query/writers/global_order_writer.cc

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "tiledb/sm/array_schema/array_schema.h"
3939
#include "tiledb/sm/array_schema/dimension.h"
4040
#include "tiledb/sm/consolidator/consolidator.h"
41+
#include "tiledb/sm/enums/array_type.h"
4142
#include "tiledb/sm/fragment/fragment_metadata.h"
4243
#include "tiledb/sm/misc/comparators.h"
4344
#include "tiledb/sm/misc/hilbert.h"
@@ -91,7 +92,8 @@ GlobalOrderWriter::GlobalOrderWriter(
9192
fragment_name)
9293
, processed_conditions_(processed_conditions)
9394
, fragment_size_(fragment_size)
94-
, current_fragment_size_(0) {
95+
, current_fragment_size_(0)
96+
, rows_written_(0) {
9597
// Check the layout is global order.
9698
if (layout_ != Layout::GLOBAL_ORDER) {
9799
throw GlobalOrderWriterException(
@@ -781,6 +783,13 @@ Status GlobalOrderWriter::global_write() {
781783
// Compute the number of tiles that will fit in this fragment.
782784
auto num = num_tiles_to_write(idx, tile_num, tiles);
783785

786+
if (tile_num != num && array_schema_.array_type() == ArrayType::DENSE) {
787+
// if it is a dense array and not all tiles can fit in the current
788+
// fragment then we need to split the domain
789+
NDRange new_nd = ndranges_after_split(num);
790+
frag_meta->init_domain(new_nd);
791+
}
792+
784793
// If we're resuming a fragment write and the first tile doesn't fit into
785794
// the previous fragment, we need to start a new fragment and recalculate
786795
// the number of tiles to write.
@@ -1422,6 +1431,52 @@ uint64_t GlobalOrderWriter::num_tiles_to_write(
14221431
return tile_num - start;
14231432
}
14241433

1434+
uint64_t GlobalOrderWriter::num_tiles_per_row() {
1435+
auto dim_num = array_schema_.dim_num();
1436+
uint64_t ret = 1;
1437+
for (unsigned d = 1; d < dim_num; ++d) {
1438+
// skip first dim. todo Explain
1439+
auto dim{array_schema_.dimension_ptr(d)};
1440+
auto dim_dom = dim->domain();
1441+
ret *= dim->domain_range(dim_dom);
1442+
}
1443+
return ret;
1444+
}
1445+
1446+
NDRange GlobalOrderWriter::ndranges_after_split(uint64_t num) {
1447+
uint64_t tiles_per_row = num_tiles_per_row();
1448+
auto dim_num = array_schema_.dim_num();
1449+
NDRange nd;
1450+
nd.reserve(dim_num);
1451+
1452+
if (num % tiles_per_row != 0) {
1453+
throw GlobalOrderWriterException(
1454+
"This fragment target size is not possible please try something else "); // todo fix
1455+
}
1456+
1457+
// Calculate how many rows we will write in the current fragment
1458+
uint64_t rows_to_write = num / tiles_per_row;
1459+
1460+
// Create the range for the index dim (first).
1461+
int start = rows_written_;
1462+
int end = start + rows_to_write - 1;
1463+
Range range(&start, &end, sizeof(int));
1464+
nd.emplace_back(range);
1465+
1466+
// Use the domain as ranges for the rest of the dims
1467+
for (unsigned d = 1; d < dim_num; ++d) {
1468+
// begin from second dim
1469+
auto dim{array_schema_.dimension_ptr(d)};
1470+
auto dim_dom = dim->domain();
1471+
nd.emplace_back(dim_dom);
1472+
}
1473+
1474+
// add rows written to the cache
1475+
rows_written_ += rows_to_write;
1476+
1477+
return nd;
1478+
}
1479+
14251480
Status GlobalOrderWriter::start_new_fragment() {
14261481
auto frag_meta = global_write_state_->frag_meta_;
14271482
auto& uri = frag_meta->fragment_uri();

tiledb/sm/query/writers/global_order_writer.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ class GlobalOrderWriter : public WriterBase {
215215
*/
216216
uint64_t current_fragment_size_;
217217

218+
/**
219+
* Counter for the number of rows written. This is used only when the
220+
* consolidation produces more than one fragment in Dense arrays
221+
*/
222+
uint64_t rows_written_;
223+
218224
/* ********************************* */
219225
/* PRIVATE METHODS */
220226
/* ********************************* */
@@ -385,6 +391,15 @@ class GlobalOrderWriter : public WriterBase {
385391
uint64_t tile_num,
386392
tdb::pmr::unordered_map<std::string, WriterTileTupleVector>& tiles);
387393

394+
/**
395+
* Return the number of tiles a single row can hold
396+
*
397+
* @return Number of tiles.
398+
*/
399+
NDRange ndranges_after_split(uint64_t num);
400+
401+
uint64_t num_tiles_per_row();
402+
388403
/**
389404
* Close the current fragment and start a new one. The closed fragment will
390405
* be added to `frag_uris_to_commit_` so that all fragments in progress can

0 commit comments

Comments
 (0)