Skip to content

Commit c51d0b2

Browse files
authored
Add support for ingest() with index_timestamp for Vamana type-erased index (#341)
1 parent 5aee8a2 commit c51d0b2

File tree

13 files changed

+95
-65
lines changed

13 files changed

+95
-65
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def ingest(
9090
updates_uri: str
9191
Updates
9292
index_timestamp: int
93-
Timestamp to use for writing and reading data. By default it sues the current unix ms timestamp.
93+
Timestamp to use for writing and reading data. By default it uses the current unix ms timestamp.
9494
config: None
9595
config dictionary, defaults to None
9696
namespace: str
@@ -1633,7 +1633,7 @@ def ingest_vamana(
16331633
data = vspy.FeatureVectorArray(ctx, parts_array_uri, ids_array_uri)
16341634
index.train(data)
16351635
index.add(data)
1636-
index.write_index(ctx, index_group_uri)
1636+
index.write_index(ctx, index_group_uri, index_timestamp)
16371637

16381638
def write_centroids(
16391639
centroids: np.ndarray,

apis/python/src/tiledb/vector_search/ivf_flat_index.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def query_internal(
141141
resources: Optional[Mapping[str, Any]] = None,
142142
num_partitions: int = -1,
143143
num_workers: int = -1,
144+
**kwargs,
144145
):
145146
"""
146147
Query an IVF_FLAT index

apis/python/src/tiledb/vector_search/type_erased_module.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,13 @@ void init_type_erased_module(py::module_& m) {
284284
[](IndexVamana& index,
285285
const tiledb::Context& ctx,
286286
const std::string& group_uri,
287+
std::optional<size_t> timestamp,
287288
const std::string& storage_version) {
288-
index.write_index(ctx, group_uri, storage_version);
289+
index.write_index(ctx, group_uri, timestamp, storage_version);
289290
},
290291
py::arg("ctx"),
291292
py::arg("group_uri"),
293+
py::arg("timestamp") = py::none(),
292294
py::arg("storage_version") = "")
293295
.def("feature_type_string", &IndexVamana::feature_type_string)
294296
.def("id_type_string", &IndexVamana::id_type_string)

apis/python/src/tiledb/vector_search/vamana_index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,5 @@ def create(
121121
)
122122
index.train(empty_vector)
123123
index.add(empty_vector)
124-
index.write_index(ctx, uri, storage_version)
124+
index.write_index(ctx, uri, 0, storage_version)
125125
return VamanaIndex(uri=uri, config=config, memory_budget=1000000)

apis/python/test/test_index.py

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import time
32

43
import numpy as np
54
import pytest
@@ -57,31 +56,20 @@ def check_default_metadata(
5756
assert type(group.meta["index_type"]) == str
5857

5958
assert "base_sizes" in group.meta
60-
if is_type_erased_index(expected_index_type):
61-
# NOTE(paris): Type-erased indexes have two values upon creation.
62-
assert group.meta["base_sizes"] == "[0,0]"
63-
else:
64-
assert group.meta["base_sizes"] == json.dumps([0])
59+
assert group.meta["base_sizes"] == json.dumps([0])
6560
assert type(group.meta["base_sizes"]) == str
6661

6762
assert "ingestion_timestamps" in group.meta
68-
if is_type_erased_index(expected_index_type):
69-
# NOTE(paris): Type-erased indexes have two values upon creation.
70-
ingestion_timestamps = json.loads(group.meta["ingestion_timestamps"])
71-
assert len(ingestion_timestamps) == 2
72-
assert ingestion_timestamps[0] == 0
73-
current_time_ms = int(time.time() * 1000)
74-
assert ingestion_timestamps[1] < current_time_ms
75-
assert ingestion_timestamps[1] > current_time_ms - 1000 * 5
76-
else:
77-
assert group.meta["ingestion_timestamps"] == json.dumps([0])
63+
assert group.meta["ingestion_timestamps"] == json.dumps([0])
7864
assert type(group.meta["ingestion_timestamps"]) == str
7965

8066
if not is_type_erased_index(expected_index_type):
8167
# NOTE(paris): Type-erased indexes do not write has_updates.
8268
assert "has_updates" in group.meta
8369
assert group.meta["has_updates"] == 0
8470
assert type(group.meta["has_updates"]) == np.int64
71+
else:
72+
assert "has_updates" not in group.meta
8573

8674

8775
def test_flat_index(tmp_path):
@@ -279,7 +267,7 @@ def test_vamana_index(tmp_path):
279267

280268
def test_delete_invalid_index(tmp_path):
281269
# We don't throw with an invalid uri.
282-
Index.delete_index(uri="invalid_uri", config=tiledb.cloud.Config())
270+
Index.delete_index(uri="invalid_uri", config={})
283271

284272

285273
def test_delete_index(tmp_path):
@@ -289,7 +277,7 @@ def test_delete_index(tmp_path):
289277
for index_type, index_class in zip(indexes, index_classes):
290278
index_uri = os.path.join(tmp_path, f"array_{index_type}")
291279
ingest(index_type=index_type, index_uri=index_uri, input_vectors=data)
292-
Index.delete_index(uri=index_uri, config=tiledb.cloud.Config())
280+
Index.delete_index(uri=index_uri, config={})
293281
with pytest.raises(tiledb.TileDBError) as error:
294282
index_class(uri=index_uri)
295283
assert "does not exist" in str(error.value)

apis/python/test/test_ingestion.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -607,11 +607,6 @@ def test_ingestion_with_updates_and_timetravel(tmp_path):
607607
gt_i, gt_d = get_groundtruth(dataset_dir, k)
608608

609609
for index_type, index_class in zip(INDEXES, INDEX_CLASSES):
610-
# TODO(paris): Fix Vamana bug and re-enable:
611-
# ValueError: New ingestion timestamp: 1 can't be smaller that the latest ingestion timestamp: 1713444057062
612-
if index_type == "VAMANA":
613-
continue
614-
615610
index_uri = os.path.join(tmp_path, f"array_{index_type}")
616611
index = ingest(
617612
index_type=index_type,
@@ -647,6 +642,11 @@ def test_ingestion_with_updates_and_timetravel(tmp_path):
647642
index = index_class(uri=index_uri, timestamp=101)
648643
_, result = index.query(queries, k=k, nprobe=partitions)
649644
assert accuracy(result, gt_i, updated_ids=updated_ids) == 1.0
645+
646+
# TODO(paris): Fix Vamana bug and re-enable:
647+
if index_type == "VAMANA":
648+
continue
649+
650650
index_uri = move_local_index_to_new_location(index_uri)
651651
index = index_class(uri=index_uri, timestamp=(0, 101))
652652
_, result = index.query(queries, k=k, nprobe=partitions)
@@ -837,11 +837,6 @@ def test_ingestion_with_additions_and_timetravel(tmp_path):
837837
gt_i, gt_d = get_groundtruth(dataset_dir, k)
838838

839839
for index_type, index_class in zip(INDEXES, INDEX_CLASSES):
840-
# TODO(paris): Fix Vamana bug and re-enable:
841-
# ValueError: New ingestion timestamp: 1 can't be smaller that the latest ingestion timestamp: 1713444057062
842-
if index_type == "VAMANA":
843-
continue
844-
845840
index_uri = os.path.join(tmp_path, f"array_{index_type}")
846841
index = ingest(
847842
index_type=index_type,
@@ -865,14 +860,17 @@ def test_ingestion_with_additions_and_timetravel(tmp_path):
865860
)
866861
updated_ids[i] = i + update_ids_offset
867862

868-
index_uri = move_local_index_to_new_location(index_uri)
863+
# TODO(paris): Fix Vamana bug and re-enable:
864+
# tiledb.cc.TileDBError: [TileDB::ArrayDirectory] Error: Cannot open array; Array does not exist.
865+
if index_type != "VAMANA":
866+
index_uri = move_local_index_to_new_location(index_uri)
869867
index = index_class(uri=index_uri)
870-
_, result = index.query(queries, k=k, nprobe=partitions)
871-
assert 0.45 < accuracy(result, gt_i) < 0.55
868+
_, result = index.query(queries, k=k, nprobe=partitions, opt_l=k * 2)
869+
assert 0.45 < accuracy(result, gt_i)
872870

873871
index = index.consolidate_updates()
874-
_, result = index.query(queries, k=k, nprobe=partitions)
875-
assert 0.45 < accuracy(result, gt_i) < 0.55
872+
_, result = index.query(queries, k=k, nprobe=partitions, opt_l=k * 2)
873+
assert 0.45 < accuracy(result, gt_i)
876874

877875

878876
def test_ivf_flat_ingestion_tdb_random_sampling_policy(tmp_path):

src/include/api/vamana_index.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,13 @@ class IndexVamana {
235235
void write_index(
236236
const tiledb::Context& ctx,
237237
const std::string& group_uri,
238-
const std::string& storage_version = "") const {
238+
std::optional<size_t> timestamp = std::nullopt,
239+
const std::string& storage_version = "") {
239240
if (!index_) {
240241
throw std::runtime_error(
241242
"Cannot write_index() because there is no index.");
242243
}
243-
index_->write_index(ctx, group_uri, storage_version);
244+
index_->write_index(ctx, group_uri, timestamp, storage_version);
244245
}
245246

246247
constexpr auto dimension() const {
@@ -291,7 +292,8 @@ class IndexVamana {
291292
virtual void write_index(
292293
const tiledb::Context& ctx,
293294
const std::string& group_uri,
294-
const std::string& storage_version) const = 0;
295+
std::optional<size_t> timestamp,
296+
const std::string& storage_version) = 0;
295297

296298
[[nodiscard]] virtual size_t dimension() const = 0;
297299
};
@@ -397,8 +399,9 @@ class IndexVamana {
397399
void write_index(
398400
const tiledb::Context& ctx,
399401
const std::string& group_uri,
400-
const std::string& storage_version) const override {
401-
impl_index_.write_index(ctx, group_uri, storage_version);
402+
std::optional<size_t> timestamp,
403+
const std::string& storage_version) override {
404+
impl_index_.write_index(ctx, group_uri, timestamp, storage_version);
402405
}
403406

404407
size_t dimension() const override {

src/include/index/index_group.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ class base_index_group {
239239
if (exists(cached_ctx_)) {
240240
/** Load the current group metadata */
241241
init_for_open(cfg);
242-
if (index_timestamp_ < metadata_.ingestion_timestamps_.back()) {
242+
if (!metadata_.ingestion_timestamps_.empty() &&
243+
index_timestamp_ < metadata_.ingestion_timestamps_.back()) {
243244
throw std::runtime_error(
244245
"Requested write timestamp " + std::to_string(index_timestamp_) +
245246
" is not greater than " +
@@ -373,9 +374,6 @@ class base_index_group {
373374
auto get_previous_ingestion_timestamp() const {
374375
return metadata_.ingestion_timestamps_.back();
375376
}
376-
auto get_ingestion_timestamp() const {
377-
return metadata_.ingestion_timestamps_[timetravel_index_];
378-
}
379377
auto append_ingestion_timestamp(size_t timestamp) {
380378
metadata_.ingestion_timestamps_.push_back(timestamp);
381379
}

src/include/index/vamana_group.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,9 @@ class vamana_index_group : public base_index_group<vamana_index_group<Index>> {
257257
metadata_.adjacency_row_index_type_str_ =
258258
type_to_string_v<typename index_type::adjacency_row_index_type>;
259259

260-
metadata_.ingestion_timestamps_ = {0};
261-
metadata_.base_sizes_ = {0};
262-
metadata_.num_edges_history_ = {0};
260+
metadata_.ingestion_timestamps_ = {};
261+
metadata_.base_sizes_ = {};
262+
metadata_.num_edges_history_ = {};
263263
metadata_.temp_size_ = 0;
264264
metadata_.dimension_ = this->get_dimension();
265265

src/include/index/vamana_index.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,11 @@ class vamana_index {
840840
auto write_index(
841841
const tiledb::Context& ctx,
842842
const std::string& group_uri,
843-
const std::string& storage_version = "") const {
843+
std::optional<size_t> timestamp = std::nullopt,
844+
const std::string& storage_version = "") {
845+
if (timestamp.has_value()) {
846+
timestamp_ = timestamp.value();
847+
}
844848
// metadata: dimension, ntotal, L, R, B, alpha_min, alpha_max, medoid
845849
// Save as a group: metadata, feature_vectors, graph edges, offsets
846850

0 commit comments

Comments
 (0)