Skip to content

Commit 45e0ca9

Browse files
author
Nikos Papailiou
committed
Merge branch 'main' into npapa/support-ingestion-with-updates
2 parents 525160d + a1511dc commit 45e0ca9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+4652
-1642
lines changed

.github/workflows/quarto-render.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Cloned from https://github.com/TileDB-Inc/tiledb-quarto-template
22

33
name: Render and deploy Quarto files
4-
on:
4+
on:
55
push:
66
pull_request:
77

@@ -36,7 +36,7 @@ jobs:
3636
- name: "Quarto render"
3737
shell: bash
3838
run: |
39-
pip install quartodoc PyYAML click griffe==0.32.3
39+
pip install quartodoc PyYAML click "griffe<0.33" # incompatible
4040
# create a symlink to the tiledbvcf python package, so it doesn't have to be installed
4141
#ln -s apis/python/src/tiledb/vector_search
4242
quartodoc build

apis/python/src/tiledb/vector_search/flat_index.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,6 @@ def query_internal(
7575
assert queries.dtype == np.float32
7676

7777
queries_m = array_to_matrix(np.transpose(queries))
78-
r = query_vq_heap(self._db, queries_m, self._ids, k, nthreads)
78+
d, i = query_vq_heap(self._db, queries_m, self._ids, k, nthreads)
7979

80-
return np.transpose(np.array(r))
80+
return np.transpose(np.array(d)), np.transpose(np.array(i))

apis/python/src/tiledb/vector_search/index.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import numpy as np
2+
import sys
23

34
from tiledb.vector_search.module import *
45
from tiledb.vector_search.storage_formats import storage_formats
56
from typing import Any, Mapping, Optional
67

7-
MAX_UINT64 = 2 ** 63 - 1
8+
MAX_UINT64 = np.iinfo(np.dtype("uint64")).max
9+
MAX_FLOAT_32 = np.finfo(np.dtype("float32")).max
810

911

1012
class Index:
@@ -37,43 +39,44 @@ def __init__(
3739

3840

3941
def query(self, queries: np.ndarray, k, **kwargs):
40-
# TODO merge results based on scores and use higher k to improve retrieval
4142
updated_ids = set(self.read_updated_ids())
42-
internal_results = self.query_internal(queries, k, **kwargs)
43+
internal_results_d, internal_results_i = self.query_internal(queries, k, **kwargs)
4344
if self.update_arrays_uri is None:
44-
return internal_results
45-
addition_results = self.query_additions(queries, k)
46-
merged_results = np.zeros((queries.shape[0], k), dtype=np.uint64)
45+
return internal_results_d, internal_results_i
46+
47+
addition_results_d, addition_results_i = self.query_additions(queries, k)
48+
# Filter updated vectors
4749
query_id = 0
48-
for query in internal_results:
50+
for query in internal_results_i:
4951
res_id = 0
50-
additional_res_id = 0
5152
for res in query:
5253
if res in updated_ids:
53-
merged_results[query_id, res_id] = addition_results[query_id, additional_res_id]
54-
additional_res_id += 1
55-
else:
56-
merged_results[query_id, res_id] = res
54+
internal_results_d[query_id, res_id] = MAX_FLOAT_32
55+
internal_results_i[query_id, res_id] = 0
5756
res_id += 1
5857
query_id += 1
59-
return merged_results
58+
# Merge update results
59+
results_d = np.hstack((internal_results_d, addition_results_d))
60+
results_i = np.hstack((internal_results_i, addition_results_i))
61+
sort_index = np.argsort(results_d, axis=1)
62+
results_d = np.take_along_axis(results_d, sort_index, axis=1)
63+
results_i = np.take_along_axis(results_i, sort_index, axis=1)
64+
return results_d[:, 0:k], results_i[:, 0:k]
6065

6166
def query_internal(self, queries: np.ndarray, k, **kwargs):
6267
raise NotImplementedError
6368

6469
def query_additions(self, queries: np.ndarray, k):
6570
assert queries.dtype == np.float32
66-
6771
additions_vectors, additions_external_ids = self.read_additions()
6872
queries_m = array_to_matrix(np.transpose(queries))
69-
r = query_vq_heap_pyarray(
73+
d, i = query_vq_heap_pyarray(
7074
array_to_matrix(np.transpose(additions_vectors).astype(self.dtype)),
7175
queries_m,
7276
StdVector_u64(additions_external_ids),
7377
k,
7478
8)
75-
76-
return np.transpose(np.array(r))
79+
return np.transpose(np.array(d)), np.transpose(np.array(i))
7780

7881
def update(self, vector: np.array, external_id: np.uint64):
7982
updates_array = self.open_updates_array()
@@ -129,7 +132,7 @@ def open_updates_array(self):
129132
if tiledb.array_exists(updates_array_uri):
130133
raise RuntimeError(f"Array {updates_array_uri} already exists.")
131134
external_id_dim = tiledb.Dim(
132-
name="external_id", domain=(0, MAX_UINT64), dtype=np.dtype(np.uint64)
135+
name="external_id", domain=(0, MAX_UINT64-1), dtype=np.dtype(np.uint64)
133136
)
134137
dom = tiledb.Domain(external_id_dim)
135138
vector_attr = tiledb.Attr(name="vector", dtype=self.dtype, var=True)

apis/python/src/tiledb/vector_search/ivf_flat_index.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def query_internal(
130130
if mode is None:
131131
queries_m = array_to_matrix(np.transpose(queries))
132132
if self.memory_budget == -1:
133-
r = ivf_query_ram(
133+
d, i = ivf_query_ram(
134134
self.dtype,
135135
self._db,
136136
self._centroids,
@@ -139,13 +139,12 @@ def query_internal(
139139
self._ids,
140140
nprobe=nprobe,
141141
k_nn=k,
142-
nth=True, # ??
143142
nthreads=nthreads,
144143
ctx=self.ctx,
145144
use_nuv_implementation=use_nuv_implementation,
146145
)
147146
else:
148-
r = ivf_query(
147+
d, i = ivf_query(
149148
self.dtype,
150149
self.db_uri,
151150
self._centroids,
@@ -155,13 +154,12 @@ def query_internal(
155154
nprobe=nprobe,
156155
k_nn=k,
157156
memory_budget=self.memory_budget,
158-
nth=True, # ??
159157
nthreads=nthreads,
160158
ctx=self.ctx,
161159
use_nuv_implementation=use_nuv_implementation,
162160
)
163161

164-
return np.transpose(np.array(r))
162+
return np.transpose(np.array(d)), np.transpose(np.array(i))
165163
else:
166164
return self.taskgraph_query(
167165
queries=queries,
@@ -322,7 +320,8 @@ def dist_qv_udf(
322320
res = node.result()
323321
results.append(res)
324322

325-
results_per_query = []
323+
results_per_query_d = []
324+
results_per_query_i = []
326325
for q in range(queries.shape[0]):
327326
tmp_results = []
328327
for j in range(k):
@@ -333,5 +332,6 @@ def dist_qv_udf(
333332
tmp = sorted(tmp_results, key=lambda t: t[0])[0:k]
334333
for j in range(len(tmp), k):
335334
tmp.append((float(0.0), int(0)))
336-
results_per_query.append(np.array(tmp, dtype=np.dtype("float,int"))["f1"])
337-
return results_per_query
335+
results_per_query_d.append(np.array(tmp, dtype=np.dtype("float,uint64"))["f0"])
336+
results_per_query_i.append(np.array(tmp, dtype=np.dtype("float,uint64"))["f1"])
337+
return results_per_query_d, results_per_query_i

apis/python/src/tiledb/vector_search/module.cc

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,13 @@
55
#include <pybind11/stl.h>
66

77
#include "linalg.h"
8-
#include "ivf_index.h"
98
#include "ivf_query.h"
109
#include "flat_query.h"
1110

1211
namespace py = pybind11;
1312
using Ctx = tiledb::Context;
1413

1514
bool global_debug = false;
16-
double global_time_of_interest;
1715

1816
bool enable_stats = false;
1917
std::vector<json> core_stats;
@@ -103,6 +101,19 @@ static void declare_pyarray_to_matrix(py::module& m, const std::string& suffix)
103101
});
104102
}
105103

104+
namespace {
105+
template <typename ...TArgs>
106+
py::tuple make_python_pair(std::tuple<TArgs...>&& arg) {
107+
static_assert(sizeof...(TArgs) == 2, "Must have exactly two arguments");
108+
109+
return py::make_tuple<py::return_value_policy::automatic>(
110+
py::cast(std::get<0>(arg), py::return_value_policy::move),
111+
py::cast(std::get<1>(arg), py::return_value_policy::move)
112+
);
113+
}
114+
115+
}
116+
106117
template <typename T, typename Id_Type = uint64_t>
107118
static void declare_qv_query_heap_infinite_ram(py::module& m, const std::string& suffix) {
108119
m.def(("qv_query_heap_infinite_ram_" + suffix).c_str(),
@@ -113,8 +124,7 @@ static void declare_qv_query_heap_infinite_ram(py::module& m, const std::string&
113124
std::vector<Id_Type>& ids,
114125
size_t nprobe,
115126
size_t k_nn,
116-
bool nth,
117-
size_t nthreads) -> ColMajorMatrix<size_t> { // TODO change return type
127+
size_t nthreads) -> py::tuple { //std::pair<ColMajorMatrix<float>, ColMajorMatrix<size_t>> { // TODO change return type
118128

119129
auto r = detail::ivf::qv_query_heap_infinite_ram(
120130
parts,
@@ -124,9 +134,8 @@ static void declare_qv_query_heap_infinite_ram(py::module& m, const std::string&
124134
ids,
125135
nprobe,
126136
k_nn,
127-
nth,
128137
nthreads);
129-
return r;
138+
return make_python_pair(std::move(r));
130139
}, py::keep_alive<1,2>());
131140
}
132141

@@ -142,8 +151,7 @@ static void declare_qv_query_heap_finite_ram(py::module& m, const std::string& s
142151
size_t nprobe,
143152
size_t k_nn,
144153
size_t upper_bound,
145-
bool nth,
146-
size_t nthreads) -> ColMajorMatrix<size_t> { // TODO change return type
154+
size_t nthreads) -> py::tuple { //std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> { // TODO change return type
147155

148156
auto r = detail::ivf::qv_query_heap_finite_ram<T, Id_Type>(
149157
ctx,
@@ -155,9 +163,8 @@ static void declare_qv_query_heap_finite_ram(py::module& m, const std::string& s
155163
nprobe,
156164
k_nn,
157165
upper_bound,
158-
nth,
159166
nthreads);
160-
return r;
167+
return make_python_pair(std::move(r));
161168
}, py::keep_alive<1,2>());
162169
}
163170

@@ -171,8 +178,7 @@ static void declare_nuv_query_heap_infinite_ram(py::module& m, const std::string
171178
std::vector<Id_Type>& ids,
172179
size_t nprobe,
173180
size_t k_nn,
174-
bool nth,
175-
size_t nthreads) -> ColMajorMatrix<size_t> { // TODO change return type
181+
size_t nthreads) -> std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> { // TODO change return type
176182

177183
auto r = detail::ivf::nuv_query_heap_infinite_ram_reg_blocked(
178184
parts,
@@ -182,7 +188,6 @@ static void declare_nuv_query_heap_infinite_ram(py::module& m, const std::string
182188
ids,
183189
nprobe,
184190
k_nn,
185-
nth,
186191
nthreads);
187192
return r;
188193
}, py::keep_alive<1,2>());
@@ -200,8 +205,7 @@ static void declare_nuv_query_heap_finite_ram(py::module& m, const std::string&
200205
size_t nprobe,
201206
size_t k_nn,
202207
size_t upper_bound,
203-
bool nth,
204-
size_t nthreads) -> ColMajorMatrix<size_t> { // TODO change return type
208+
size_t nthreads) -> std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> { // TODO change return type
205209

206210
auto r = detail::ivf::nuv_query_heap_finite_ram_reg_blocked<T, Id_Type>(
207211
ctx,
@@ -213,7 +217,6 @@ static void declare_nuv_query_heap_finite_ram(py::module& m, const std::string&
213217
nprobe,
214218
k_nn,
215219
upper_bound,
216-
nth,
217220
nthreads);
218221
return r;
219222
}, py::keep_alive<1,2>());
@@ -398,7 +401,7 @@ static void declare_vq_query_heap(py::module& m, const std::string& suffix) {
398401
ColMajorMatrix<float>& query_vectors,
399402
const std::vector<uint64_t> &ids,
400403
int k,
401-
size_t nthreads) -> ColMajorMatrix<size_t> {
404+
size_t nthreads) -> std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> {
402405
auto r = detail::flat::vq_query_heap(data, query_vectors, ids, k, nthreads);
403406
return r;
404407
});
@@ -411,7 +414,7 @@ static void declare_vq_query_heap_pyarray(py::module& m, const std::string& suff
411414
ColMajorMatrix<float>& query_vectors,
412415
const std::vector<uint64_t> &ids,
413416
int k,
414-
size_t nthreads) -> ColMajorMatrix<size_t> {
417+
size_t nthreads) -> std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> {
415418
auto r = detail::flat::vq_query_heap(data, query_vectors, ids, k, nthreads);
416419
return r;
417420
});
@@ -494,17 +497,17 @@ PYBIND11_MODULE(_tiledbvspy, m) {
494497
[](ColMajorMatrix<float>& data,
495498
ColMajorMatrix<float>& query_vectors,
496499
int k,
497-
size_t nthreads) -> ColMajorMatrix<size_t> {
498-
auto r = detail::flat::vq_query_nth(data, query_vectors, k, true, nthreads);
500+
size_t nthreads) -> std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> {
501+
auto r = detail::flat::vq_query_heap(data, query_vectors, k, nthreads);
499502
return r;
500503
});
501504

502505
m.def("query_vq_u8",
503506
[](tdbColMajorMatrix<uint8_t>& data,
504507
ColMajorMatrix<float>& query_vectors,
505508
int k,
506-
size_t nthreads) -> ColMajorMatrix<size_t> {
507-
auto r = detail::flat::vq_query_nth(data, query_vectors, k, true, nthreads);
509+
size_t nthreads) -> std::tuple<ColMajorMatrix<float>, ColMajorMatrix<size_t>> {
510+
auto r = detail::flat::vq_query_heap(data, query_vectors, k, nthreads);
508511
return r;
509512
});
510513

apis/python/src/tiledb/vector_search/module.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ def ivf_query_ram(
207207
ids: "Vector",
208208
nprobe: int,
209209
k_nn: int,
210-
nth: bool,
211210
nthreads: int,
212211
ctx: "Ctx" = None,
213212
use_nuv_implementation: bool = False,
@@ -233,8 +232,6 @@ def ivf_query_ram(
233232
Number of probs
234233
k_nn: int
235234
Number of nn
236-
nth: bool
237-
Return nth records
238235
nthreads: int
239236
Number of theads
240237
ctx: Ctx
@@ -252,7 +249,6 @@ def ivf_query_ram(
252249
ids,
253250
nprobe,
254251
k_nn,
255-
nth,
256252
nthreads,
257253
]
258254
)
@@ -281,7 +277,6 @@ def ivf_query(
281277
nprobe: int,
282278
k_nn: int,
283279
memory_budget: int,
284-
nth: bool,
285280
nthreads: int,
286281
ctx: "Ctx" = None,
287282
use_nuv_implementation: bool = False,
@@ -309,8 +304,6 @@ def ivf_query(
309304
Number of nn
310305
memory_budget: int
311306
Main memory budget
312-
nth: bool
313-
Return nth records
314307
nthreads: int
315308
Number of theads
316309
ctx: Ctx
@@ -330,7 +323,6 @@ def ivf_query(
330323
nprobe,
331324
k_nn,
332325
memory_budget,
333-
nth,
334326
nthreads,
335327
]
336328
)

0 commit comments

Comments
 (0)