Skip to content

Commit 0daabc2

Browse files
Merge pull request #133 from TileDB-Inc/npapa/support-updates
Add automatic update array fragment consolidation
2 parents 2985579 + 23f5a51 commit 0daabc2

File tree

3 files changed

+87
-11
lines changed

3 files changed

+87
-11
lines changed

apis/python/src/tiledb/vector_search/index.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,43 @@ def __init__(
4040

4141
def query(self, queries: np.ndarray, k, **kwargs):
4242
updated_ids = set(self.read_updated_ids())
43-
internal_results_d, internal_results_i = self.query_internal(queries, k, **kwargs)
43+
retrieval_k = k
44+
if len(updated_ids) > 0:
45+
retrieval_k = 2*k
46+
internal_results_d, internal_results_i = self.query_internal(queries, retrieval_k, **kwargs)
4447
if self.update_arrays_uri is None:
45-
return internal_results_d, internal_results_i
48+
return internal_results_d[:, 0:k], internal_results_i[:, 0:k]
4649

47-
addition_results_d, addition_results_i = self.query_additions(queries, k)
4850
# Filter updated vectors
4951
query_id = 0
5052
for query in internal_results_i:
5153
res_id = 0
5254
for res in query:
5355
if res in updated_ids:
5456
internal_results_d[query_id, res_id] = MAX_FLOAT_32
55-
internal_results_i[query_id, res_id] = 0
57+
internal_results_i[query_id, res_id] = MAX_UINT64
5658
res_id += 1
5759
query_id += 1
60+
sort_index = np.argsort(internal_results_d, axis=1)
61+
internal_results_d = np.take_along_axis(internal_results_d, sort_index, axis=1)
62+
internal_results_i = np.take_along_axis(internal_results_i, sort_index, axis=1)
63+
5864
# Merge update results
65+
addition_results_d, addition_results_i = self.query_additions(queries, k)
66+
if addition_results_d is None:
67+
return internal_results_d[:, 0:k], internal_results_i[:, 0:k]
68+
69+
query_id = 0
70+
for query in addition_results_d:
71+
res_id = 0
72+
for res in query:
73+
if addition_results_d[query_id, res_id] == 0 and addition_results_i[query_id, res_id] == 0:
74+
addition_results_d[query_id, res_id] = MAX_FLOAT_32
75+
addition_results_i[query_id, res_id] = MAX_UINT64
76+
res_id += 1
77+
query_id += 1
78+
79+
5980
results_d = np.hstack((internal_results_d, addition_results_d))
6081
results_i = np.hstack((internal_results_i, addition_results_i))
6182
sort_index = np.argsort(results_d, axis=1)
@@ -69,6 +90,8 @@ def query_internal(self, queries: np.ndarray, k, **kwargs):
6990
def query_additions(self, queries: np.ndarray, k):
7091
assert queries.dtype == np.float32
7192
additions_vectors, additions_external_ids = self.read_additions()
93+
if additions_vectors is None:
94+
return None, None
7295
queries_m = array_to_matrix(np.transpose(queries))
7396
d, i = query_vq_heap_pyarray(
7497
array_to_matrix(np.transpose(additions_vectors).astype(self.dtype)),
@@ -80,18 +103,25 @@ def query_additions(self, queries: np.ndarray, k):
80103

81104
def update(self, vector: np.array, external_id: np.uint64):
82105
updates_array = self.open_updates_array()
83-
updates_array[external_id] = vector
106+
vectors = np.empty((1), dtype='O')
107+
vectors[0] = vector
108+
updates_array[external_id] = {'vector': vectors}
84109
updates_array.close()
110+
self.consolidate_update_fragments()
85111

86112
def update_batch(self, vectors: np.ndarray, external_ids: np.array):
87113
updates_array = self.open_updates_array()
88114
updates_array[external_ids] = {'vector': vectors}
89115
updates_array.close()
116+
self.consolidate_update_fragments()
90117

91118
def delete(self, external_id: np.uint64):
92119
updates_array = self.open_updates_array()
93-
updates_array[external_id] = np.array([], dtype=self.dtype)
120+
deletes = np.empty((1), dtype='O')
121+
deletes[0] = np.array([], dtype=self.dtype)
122+
updates_array[external_id] = {'vector': deletes}
94123
updates_array.close()
124+
self.consolidate_update_fragments()
95125

96126
def delete_batch(self, external_ids: np.array):
97127
updates_array = self.open_updates_array()
@@ -100,6 +130,13 @@ def delete_batch(self, external_ids: np.array):
100130
deletes[i] = np.array([], dtype=self.dtype)
101131
updates_array[external_ids] = {'vector': deletes}
102132
updates_array.close()
133+
self.consolidate_update_fragments()
134+
135+
def consolidate_update_fragments(self):
136+
fragments_info = tiledb.array_fragments(self.update_arrays_uri)
137+
if(len(fragments_info) > 10):
138+
tiledb.consolidate(self.update_arrays_uri)
139+
tiledb.vacuum(self.update_arrays_uri)
103140

104141
def get_updates_uri(self):
105142
return self.update_arrays_uri
@@ -111,8 +148,10 @@ def read_additions(self) -> (np.ndarray, np.array):
111148
q = updates_array.query(attrs=('vector',), coords=True)
112149
data = q[:]
113150
additions_filter = [len(item) > 0 for item in data["vector"]]
114-
return np.vstack(data["vector"][additions_filter]), data["external_id"][additions_filter]
115-
151+
if len(data["external_id"][additions_filter]) > 0:
152+
return np.vstack(data["vector"][additions_filter]), data["external_id"][additions_filter]
153+
else:
154+
return None, None
116155
def read_updated_ids(self) -> np.array:
117156
if self.update_arrays_uri is None:
118157
return np.array([], np.uint64)

apis/python/src/tiledb/vector_search/ivf_flat_index.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,4 +334,4 @@ def dist_qv_udf(
334334
tmp.append((float(0.0), int(0)))
335335
results_per_query_d.append(np.array(tmp, dtype=np.dtype("float,uint64"))["f0"])
336336
results_per_query_i.append(np.array(tmp, dtype=np.dtype("float,uint64"))["f1"])
337-
return results_per_query_d, results_per_query_i
337+
return np.array(results_per_query_d), np.array(results_per_query_i)

apis/python/test/test_ingestion.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,50 @@ def test_ivf_flat_ingestion_with_updates(tmp_path):
307307
_, result = index.query(query_vectors, k=k, nprobe=nprobe)
308308
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
309309

310+
updated_ids = {}
311+
for i in range(100):
312+
index.delete(external_id=i)
313+
index.update(vector=data[i].astype(dtype), external_id=i + 1000000)
314+
updated_ids[i + 1000000] = i
315+
316+
_, result = index.query(query_vectors, k=k, nprobe=nprobe)
317+
assert accuracy(result, gt_i, updated_ids=updated_ids) > MINIMUM_ACCURACY
318+
319+
index = index.consolidate_updates()
320+
_, result = index.query(query_vectors, k=k, nprobe=nprobe)
321+
assert accuracy(result, gt_i, updated_ids=updated_ids) > MINIMUM_ACCURACY
322+
323+
def test_ivf_flat_ingestion_with_batch_updates(tmp_path):
324+
dataset_dir = os.path.join(tmp_path, "dataset")
325+
index_uri = os.path.join(tmp_path, "array")
326+
k = 10
327+
size = 100000
328+
partitions = 100
329+
dimensions = 128
330+
nqueries = 100
331+
nprobe = 20
332+
data = create_random_dataset_u8(nb=size, d=dimensions, nq=nqueries, k=k, path=dataset_dir)
333+
dtype = np.uint8
334+
335+
query_vectors = get_queries(dataset_dir, dtype=dtype)
336+
gt_i, gt_d = get_groundtruth(dataset_dir, k)
337+
index = ingest(
338+
index_type="IVF_FLAT",
339+
index_uri=index_uri,
340+
source_uri=os.path.join(dataset_dir, "data.u8bin"),
341+
partitions=partitions,
342+
input_vectors_per_work_item=int(size / 10),
343+
)
344+
_, result = index.query(query_vectors, k=k, nprobe=nprobe)
345+
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
346+
310347
update_ids = {}
311348
updated_ids = {}
312349
for i in range(0, 100000, 2):
313350
update_ids[i] = i + 1000000
314351
updated_ids[i + 1000000] = i
315-
external_ids = np.zeros((len(update_ids)*2), dtype=np.uint64)
316-
updates = np.empty((len(update_ids)*2), dtype='O')
352+
external_ids = np.zeros((len(update_ids) * 2), dtype=np.uint64)
353+
updates = np.empty((len(update_ids) * 2), dtype='O')
317354
id = 0
318355
for prev_id, new_id in update_ids.items():
319356
external_ids[id] = prev_id

0 commit comments

Comments
 (0)