Skip to content

Commit 2e5d929

Browse files
authored
tdbPartitionedMatrix will automatically close Array's when done reading (#448)
1 parent b1986c7 commit 2e5d929

File tree

5 files changed

+229
-53
lines changed

5 files changed

+229
-53
lines changed

apis/python/test/test_ingestion.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -717,9 +717,6 @@ def test_ingestion_timetravel(tmp_path):
717717
second_num_edges = num_edges_history[1]
718718

719719
# Clear all history at timestamp 19.
720-
# With type-erased indexes, we cannot call clear_history() while the index is open because they
721-
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
722-
index = None
723720
Index.clear_history(uri=index_uri, timestamp=19)
724721

725722
with tiledb.Group(index_uri, "r") as group:
@@ -1118,12 +1115,10 @@ def test_ingestion_with_updates_and_timetravel(tmp_path):
11181115
second_num_edges = num_edges_history[1]
11191116

11201117
# Clear history before the latest ingestion
1121-
latest_ingestion_timestamp = index.latest_ingestion_timestamp
11221118
assert index.latest_ingestion_timestamp == 102
1123-
# With type-erased indexes, we cannot call clear_history() while the index is open because they
1124-
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
1125-
index = None
1126-
Index.clear_history(uri=index_uri, timestamp=latest_ingestion_timestamp - 1)
1119+
Index.clear_history(
1120+
uri=index_uri, timestamp=index.latest_ingestion_timestamp - 1
1121+
)
11271122

11281123
with tiledb.Group(index_uri, "r") as group:
11291124
assert metadata_to_list(group, "ingestion_timestamps") == [102]
@@ -1166,12 +1161,8 @@ def test_ingestion_with_updates_and_timetravel(tmp_path):
11661161
assert accuracy(result, gt_i, updated_ids=updated_ids) == 1.0
11671162

11681163
# Clear all history
1169-
latest_ingestion_timestamp = index.latest_ingestion_timestamp
11701164
assert index.latest_ingestion_timestamp == 102
1171-
# With type-erased indexes, we cannot call clear_history() while the index is open because they
1172-
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
1173-
index = None
1174-
Index.clear_history(uri=index_uri, timestamp=latest_ingestion_timestamp)
1165+
Index.clear_history(uri=index_uri, timestamp=index.latest_ingestion_timestamp)
11751166
index = index_class(uri=index_uri, timestamp=1)
11761167
_, result = index.query(queries, k=k, nprobe=partitions)
11771168
assert accuracy(result, gt_i, updated_ids=updated_ids) == 0.0
@@ -1768,11 +1759,7 @@ def test_ivf_flat_ingestion_with_training_source_uri_tdb(tmp_path):
17681759
)
17691760

17701761
# Clear the index history, load, update, and query.
1771-
# With type-erased indexes, we cannot call clear_history() while the index is open because they
1772-
# open up a TileDB Array during query(). Deleting fragments while the array is open is not allowed.
1773-
latest_ingestion_timestamp = index.latest_ingestion_timestamp
1774-
index = None
1775-
Index.clear_history(uri=index_uri, timestamp=latest_ingestion_timestamp - 1)
1762+
Index.clear_history(uri=index_uri, timestamp=index.latest_ingestion_timestamp - 1)
17761763

17771764
index = IVFFlatIndex(uri=index_uri)
17781765

src/include/detail/linalg/tdb_partitioned_matrix.h

Lines changed: 77 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class tdbPartitionedMatrix
128128

129129
// For now, we assume this is always valid so we don't need to add constructor
130130
// arguments to limit it
131-
size_t num_array_rows_{0};
131+
size_t dimensions_{0};
132132

133133
// We don't actually use this
134134
// size_t num_array_cols_{0};
@@ -172,20 +172,38 @@ class tdbPartitionedMatrix
172172
* Column information
173173
****************************************************************************/
174174

175+
// The total number of columns (resident plus non-resident)
176+
unsigned long total_max_cols_{0UL};
177+
175178
// The max number of columns that can fit in allocated memory
176179
size_t column_capacity_{0};
177180

178181
// The number of columns that are currently loaded into memory
179182
size_t num_resident_cols_{0};
180183

181-
// The initial and final index numbers of the resident columns
184+
// The final index numbers of the resident columns
182185
index_type last_resident_col_{0};
183186

184187
/*****************************************************************************
185188
* Accounting information
186189
****************************************************************************/
187190
size_t max_resident_parts_{0};
188191

192+
/*****************************************************************************
193+
* Closing the arrays
194+
****************************************************************************/
195+
bool closed_ = false;
196+
197+
void close() {
198+
closed_ = true;
199+
if (partitioned_vectors_array_->is_open()) {
200+
partitioned_vectors_array_->close();
201+
}
202+
if (partitioned_ids_array_->is_open()) {
203+
partitioned_ids_array_->close();
204+
}
205+
}
206+
189207
public:
190208
tdbPartitionedMatrix(const tdbPartitionedMatrix&) = delete;
191209
tdbPartitionedMatrix(tdbPartitionedMatrix&&) = default;
@@ -307,20 +325,20 @@ class tdbPartitionedMatrix
307325

308326
auto domain_{partitioned_vectors_schema_.domain()};
309327

310-
auto array_rows_{domain_.dimension(0)};
311-
auto array_cols_{domain_.dimension(1)};
328+
auto array_rows{domain_.dimension(0)};
329+
auto array_cols{domain_.dimension(1)};
312330

313-
num_array_rows_ =
314-
(array_rows_.template domain<row_domain_type>().second -
315-
array_rows_.template domain<row_domain_type>().first + 1);
331+
dimensions_ =
332+
(array_rows.template domain<row_domain_type>().second -
333+
array_rows.template domain<row_domain_type>().first + 1);
316334

317335
// We don't use this. The active partitions naturally limits the number of
318336
// columns that we will read in.
319337
// Comment out for now
320338
#if 0
321339
num_array_cols_ =
322-
(array_cols_.template domain<col_domain_type>().second -
323-
array_cols_.template domain<col_domain_type>().first + 1);
340+
(array_cols.template domain<col_domain_type>().second -
341+
array_cols.template domain<col_domain_type>().first + 1);
324342
#endif
325343

326344
if ((matrix_order_ == TILEDB_ROW_MAJOR && cell_order == TILEDB_COL_MAJOR) ||
@@ -329,12 +347,11 @@ class tdbPartitionedMatrix
329347
}
330348

331349
// indices might not be contiguous, so we need to explicitly add the deltas
332-
auto total_max_cols = 0UL;
333350
size_t max_part_size{0};
334351
for (size_t i = 0; i < total_num_parts_; ++i) {
335352
auto part_size = master_indices_[relevant_parts_[i] + 1] -
336353
master_indices_[relevant_parts_[i]];
337-
total_max_cols += part_size;
354+
total_max_cols_ += part_size;
338355
max_part_size = std::max<size_t>(max_part_size, part_size);
339356
}
340357

@@ -344,8 +361,8 @@ class tdbPartitionedMatrix
344361
std::to_string(upper_bound) + " < " + std::to_string(max_part_size));
345362
}
346363

347-
if (upper_bound == 0 || upper_bound > total_max_cols) {
348-
column_capacity_ = total_max_cols;
364+
if (upper_bound == 0 || upper_bound > total_max_cols_) {
365+
column_capacity_ = total_max_cols_;
349366
} else {
350367
column_capacity_ = upper_bound;
351368
}
@@ -397,9 +414,8 @@ class tdbPartitionedMatrix
397414
* resident at any one time. We use this to size the index of the
398415
* partitioned_matrix base class.
399416
*/
400-
size_t dimension = num_array_rows_;
401417
Base::operator=(
402-
std::move(Base{dimension, column_capacity_, max_resident_parts_}));
418+
std::move(Base{dimensions_, column_capacity_, max_resident_parts_}));
403419
this->num_vectors_ = 0;
404420
this->num_parts_ = 0;
405421

@@ -422,7 +438,8 @@ class tdbPartitionedMatrix
422438

423439
if (this->part_index_.size() != max_resident_parts_ + 1) {
424440
throw std::runtime_error(
425-
"Invalid partitioning, part_index_ size " +
441+
"[tdb_partioned_matrix@load] Invalid partitioning, part_index_ "
442+
"size " +
426443
std::to_string(this->part_index_.size()) +
427444
" != " + std::to_string(max_resident_parts_ + 1));
428445
}
@@ -458,7 +475,8 @@ class tdbPartitionedMatrix
458475
// for, throw.
459476
if (num_resident_cols_ > column_capacity_) {
460477
throw std::runtime_error(
461-
"Invalid partitioning, num_resident_cols_ (" +
478+
"[tdb_partioned_matrix@load] Invalid partitioning, "
479+
"num_resident_cols_ (" +
462480
std::to_string(num_resident_cols_) + ") > column_capacity_ (" +
463481
std::to_string(column_capacity_) + ")");
464482
}
@@ -467,7 +485,8 @@ class tdbPartitionedMatrix
467485
num_resident_parts = last_resident_part_ - first_resident_part;
468486
if (num_resident_parts > max_resident_parts_) {
469487
throw std::runtime_error(
470-
"Invalid partitioning, num_resident_parts " +
488+
"[tdb_partioned_matrix@load] Invalid partitioning, "
489+
"num_resident_parts " +
471490
std::to_string(num_resident_parts) + " > " +
472491
std::to_string(max_resident_parts_));
473492
}
@@ -478,29 +497,41 @@ class tdbPartitionedMatrix
478497
if ((num_resident_cols_ == 0 && num_resident_parts != 0) ||
479498
(num_resident_cols_ != 0 && num_resident_parts == 0)) {
480499
throw std::runtime_error(
481-
"Invalid partitioning, " + std::to_string(num_resident_cols_) +
482-
" resident cols and " + std::to_string(num_resident_parts) +
483-
" resident parts");
500+
"[tdb_partioned_matrix@load] Invalid partitioning, " +
501+
std::to_string(num_resident_cols_) + " resident cols and " +
502+
std::to_string(num_resident_parts) + " resident parts");
484503
}
485504

486505
if (this->part_index_.size() != max_resident_parts_ + 1) {
487506
throw std::runtime_error(
488-
"Invalid partitioning, part_index_ size (" +
507+
"[tdb_partioned_matrix@load] Invalid partitioning, part_index_ "
508+
"size (" +
489509
std::to_string(this->part_index_.size()) +
490510
") != max_resident_parts_ + 1 (" +
491511
std::to_string(max_resident_parts_ + 1) + ")");
492512
}
493513
}
494514

515+
// If closed_ is true, it means we have already read all the data and closed
516+
// our arrays. Note that we could add this at the top of `load()` and
517+
// return false, but the `num_resident_cols_ == 0` check already handles
518+
// this case. So instead we leave this here - it should never be hit, but if
519+
// it is, we'll have an error to investigate, rather than just returning
520+
// false incorrectly.
521+
if (closed_) {
522+
throw std::runtime_error(
523+
"[tdb_partioned_matrix@load] Arrays are closed - this should not "
524+
"happen.");
525+
}
526+
495527
// 2. Load the vectors and IDs.
496528
{
497529
// a. Set up the vectors subarray.
498530
auto attr = partitioned_vectors_schema_.attribute(0);
499531
std::string attr_name = attr.name();
500532
tiledb::Subarray subarray(ctx_, *(this->partitioned_vectors_array_));
501533
// For a 128 dimension vector, Dimension 0 will go from 0 to 127.
502-
auto dimension = num_array_rows_;
503-
subarray.add_range(0, 0, (int)dimension - 1);
534+
subarray.add_range(0, 0, static_cast<int>(dimensions_) - 1);
504535

505536
// b. Set up the IDs subarray.
506537
auto ids_attr = ids_schema_.attribute(0);
@@ -521,22 +552,26 @@ class tdbPartitionedMatrix
521552
ids_subarray.add_range(0, (int)start, (int)stop - 1);
522553
}
523554
if (col_count != last_resident_col_ - first_resident_col) {
524-
throw std::runtime_error("Column count mismatch");
555+
throw std::runtime_error(
556+
"[tdb_partioned_matrix@load] Column count mismatch");
525557
}
526558

527559
// c. Execute the vectors query.
528560
tiledb::Query query(ctx_, *(this->partitioned_vectors_array_));
529561
auto ptr = this->data();
530562
query.set_subarray(subarray)
531563
.set_layout(partitioned_vectors_schema_.cell_order())
532-
.set_data_buffer(attr_name, ptr, col_count * dimension);
564+
.set_data_buffer(attr_name, ptr, col_count * dimensions_);
533565
tiledb_helpers::submit_query(tdb_func__, partitioned_vectors_uri_, query);
534-
_memory_data.insert_entry(tdb_func__, col_count * dimension * sizeof(T));
566+
_memory_data.insert_entry(
567+
tdb_func__, col_count * dimensions_ * sizeof(T));
535568

536569
auto qs = query.query_status();
537570
// @todo Handle incomplete queries.
538571
if (tiledb::Query::Status::COMPLETE != query.query_status()) {
539-
throw std::runtime_error("Query status is not complete -- fix me");
572+
throw std::runtime_error(
573+
"[tdb_partioned_matrix@load] Query status is not complete -- fix "
574+
"me");
540575
}
541576

542577
// d. Execute the IDs query.
@@ -549,7 +584,9 @@ class tdbPartitionedMatrix
549584

550585
// assert(tiledb::Query::Status::COMPLETE == query.query_status());
551586
if (tiledb::Query::Status::COMPLETE != ids_query.query_status()) {
552-
throw std::runtime_error("Query status is not complete -- fix me");
587+
throw std::runtime_error(
588+
"[tdb_partioned_matrix@load] Query status is not complete -- fix "
589+
"me");
553590
}
554591
}
555592

@@ -564,6 +601,12 @@ class tdbPartitionedMatrix
564601
this->num_vectors_ = num_resident_cols_;
565602
this->num_parts_ = num_resident_parts;
566603

604+
if (last_resident_part_ == total_num_parts_ &&
605+
last_resident_col_ == total_max_cols_) {
606+
// We have loaded all the data we can, let's close our Array's.
607+
close();
608+
}
609+
567610
return true;
568611
}
569612

@@ -572,12 +615,7 @@ class tdbPartitionedMatrix
572615
*/
573616
~tdbPartitionedMatrix() {
574617
// Don't really need these since tiledb::Array will close on destruction
575-
if (partitioned_vectors_array_->is_open()) {
576-
partitioned_vectors_array_->close();
577-
}
578-
if (partitioned_ids_array_->is_open()) {
579-
partitioned_ids_array_->close();
580-
}
618+
close();
581619
}
582620

583621
void debug_tdb_partitioned_matrix(const std::string& msg, size_t max_size) {
@@ -587,7 +625,11 @@ class tdbPartitionedMatrix
587625
debug_vector(squashed_indices_, "# squashed_indices_", max_size);
588626
std::cout << "# total_num_parts_: " << total_num_parts_ << std::endl;
589627
std::cout << "# last_resident_part_: " << last_resident_part_ << std::endl;
628+
std::cout << "# num_parts_: " << this->num_parts_ << std::endl;
629+
630+
std::cout << "# total_max_cols_: " << total_max_cols_ << std::endl;
590631
std::cout << "# column_capacity_: " << column_capacity_ << std::endl;
632+
std::cout << "# num_vectors_: " << this->num_vectors_ << std::endl;
591633
std::cout << "# num_resident_cols_: " << num_resident_cols_ << std::endl;
592634
std::cout << "# last_resident_col_: " << last_resident_col_ << std::endl;
593635
std::cout << "# max_resident_parts_: " << max_resident_parts_ << std::endl;

src/include/test/unit_api_ivf_pq_index.cc

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,56 @@ TEST_CASE("storage_version", "[api_ivf_pq_index]") {
549549
}
550550
}
551551

552+
TEST_CASE("clear history with an open index", "[api_ivf_pq_index]") {
553+
auto ctx = tiledb::Context{};
554+
using feature_type_type = uint8_t;
555+
using id_type_type = uint32_t;
556+
auto feature_type = "uint8";
557+
auto id_type = "uint32";
558+
auto partitioning_index_type = "uint32";
559+
size_t dimensions = 3;
560+
size_t n_list = 1;
561+
size_t num_subspaces = 1;
562+
float convergence_tolerance = 0.00003f;
563+
size_t max_iterations = 3;
564+
565+
std::string index_uri =
566+
(std::filesystem::temp_directory_path() / "api_ivf_pq_index").string();
567+
tiledb::VFS vfs(ctx);
568+
if (vfs.is_dir(index_uri)) {
569+
vfs.remove_dir(index_uri);
570+
}
571+
572+
auto index = IndexIVFPQ(std::make_optional<IndexOptions>(
573+
{{"feature_type", feature_type},
574+
{"id_type", id_type},
575+
{"partitioning_index_type", partitioning_index_type},
576+
{"n_list", std::to_string(n_list)},
577+
{"num_subspaces", std::to_string(num_subspaces)},
578+
{"convergence_tolerance", std::to_string(convergence_tolerance)},
579+
{"max_iterations", std::to_string(max_iterations)}}));
580+
581+
auto training = ColMajorMatrixWithIds<feature_type_type, id_type_type>{
582+
{{1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}}, {1, 2, 3, 4}};
583+
auto training_vector_array = FeatureVectorArray(training);
584+
index.train(training_vector_array);
585+
index.add(training_vector_array);
586+
index.write_index(ctx, index_uri, TemporalPolicy(TimeTravel, 99));
587+
588+
auto&& [scores_vector_array, ids_vector_array] =
589+
index.query(QueryType::InfiniteRAM, training_vector_array, 1, 1);
590+
591+
auto second_index = IndexIVFPQ(ctx, index_uri);
592+
auto&& [scores_vector_array_finite, ids_vector_array_finite] =
593+
second_index.query(QueryType::FiniteRAM, training_vector_array, 1, 1);
594+
595+
// Here we check that we can clear_history() even with a index in memory. This
596+
// makes sure that every Array which IndexIVFPQ opens has been closed,
597+
// otherwise clear_history() will throw when it tries to call
598+
// delete_fragments() on the index Array's.
599+
IndexIVFPQ::clear_history(ctx, index_uri, 99);
600+
}
601+
552602
TEST_CASE("write and load index with timestamps", "[api_ivf_pq_index]") {
553603
auto ctx = tiledb::Context{};
554604
using feature_type_type = uint8_t;

0 commit comments

Comments
 (0)