Skip to content

Commit f4eac93

Browse files
author
Nikos Papailiou
committed
Distributed query implementation
1 parent 5fa1323 commit f4eac93

File tree

4 files changed

+119
-31
lines changed

4 files changed

+119
-31
lines changed

apis/python/src/tiledb/vector_search/index.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,18 @@
33

44
import numpy as np
55
from tiledb.vector_search.module import *
6+
from tiledb.cloud.dag import Mode
67

78
CENTROIDS_ARRAY_NAME = "centroids.tdb"
89
INDEX_ARRAY_NAME = "index.tdb"
910
IDS_ARRAY_NAME = "ids.tdb"
1011
PARTS_ARRAY_NAME = "parts.tdb"
1112

13+
def submit_local(d, func, *args, **kwargs):
14+
# Drop kwarg
15+
kwargs.pop("image_name", None)
16+
kwargs.pop("resources", None)
17+
return d.submit_local(func, *args, **kwargs)
1218

1319
class Index:
1420
def query(self, targets: np.ndarray, k=10, nqueries=10, nthreads=8, nprobe=1):
@@ -179,6 +185,7 @@ def distributed_query(
179185
nthreads=8,
180186
nprobe=1,
181187
num_nodes=5,
188+
mode: Mode = Mode.REALTIME,
182189
):
183190
"""
184191
Distributed Query on top of an IVF_FLAT index
@@ -196,7 +203,64 @@ def distributed_query(
196203
nprobe: int
197204
number of probes
198205
"""
199-
assert targets.dtype == np.float32
206+
from tiledb.cloud import dag
207+
from tiledb.cloud.dag import Mode
208+
from tiledb.vector_search.module import array_to_matrix, partition_ivf_index, dist_qv
209+
import math
210+
import numpy as np
211+
from functools import partial
212+
213+
def dist_qv_udf(
214+
dtype: np.dtype,
215+
parts_uri: str,
216+
ids_uri: str,
217+
query_vectors: np.ndarray,
218+
active_partitions: np.array,
219+
active_queries: np.array,
220+
indices: np.array,
221+
k_nn: int):
222+
targets_m = array_to_matrix(query_vectors)
223+
r = dist_qv(
224+
dtype=dtype,
225+
parts_uri=parts_uri,
226+
ids_uri=ids_uri,
227+
query_vectors=targets_m,
228+
active_partitions=active_partitions,
229+
active_queries=active_queries,
230+
indices=indices,
231+
k_nn=k_nn,
232+
)
233+
results = []
234+
for q in range(len(r)):
235+
tmp_results = []
236+
for j in range(len(r[q])):
237+
tmp_results.append(r[q][j])
238+
results.append(tmp_results)
239+
return results
240+
241+
assert targets.dtype == self.dtype
242+
if mode == Mode.BATCH:
243+
d = dag.DAG(
244+
name="vector-query",
245+
mode=Mode.BATCH,
246+
max_workers=num_nodes,
247+
)
248+
if mode == Mode.REALTIME:
249+
d = dag.DAG(
250+
name="vector-query",
251+
mode=Mode.REALTIME,
252+
max_workers=num_nodes,
253+
)
254+
else:
255+
d = dag.DAG(
256+
name="vector-query",
257+
mode=Mode.REALTIME,
258+
max_workers=1,
259+
namespace="default",
260+
)
261+
submit = partial(submit_local, d)
262+
if mode == Mode.BATCH or mode == Mode.REALTIME:
263+
submit = d.submit
200264

201265
targets_m = array_to_matrix(targets)
202266
active_partitions, active_queries = partition_ivf_index(
@@ -207,29 +271,38 @@ def distributed_query(
207271
num_parts = len(active_partitions)
208272

209273
parts_per_node = int(math.ceil(num_parts / num_nodes))
210-
results = []
274+
nodes = []
211275
for part in range(0, num_parts, parts_per_node):
212276
part_end = part + parts_per_node
213277
if part_end > num_parts:
214278
part_end = num_parts
215-
results.append(dist_qv(
279+
nodes.append(submit(
280+
dist_qv_udf,
216281
dtype=self.dtype,
217282
parts_uri=self.parts_db_uri,
218283
ids_uri=self.ids_uri,
219-
query_vectors=targets_m,
284+
query_vectors=targets,
220285
active_partitions=np.array(active_partitions)[part:part_end],
221-
active_queries=np.array(active_queries[part:part_end]),
222-
indices=self._index,
286+
active_queries=np.array(active_queries[part:part_end], dtype=object),
287+
indices=np.array(self._index),
223288
k_nn=k,
224-
ctx=self.ctx,
289+
resource_class='large',
290+
image_name="3.9-vectorsearch",
225291
))
226292

293+
d.compute()
294+
d.wait()
295+
results = []
296+
for node in nodes:
297+
res = node.result()
298+
results.append(res)
299+
227300
results_per_query = []
228301
for q in range(targets.shape[1]):
229302
tmp_results = []
230303
for j in range(k):
231304
for r in results:
232-
if len(r[q]) > 0:
305+
if len(r[q]) > j:
233306
if r[q][j][0] > 0:
234307
tmp_results.append(r[q][j])
235308
results_per_query.append(sorted(tmp_results, key=lambda t: t[0])[0:k])

apis/python/src/tiledb/vector_search/module.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,8 @@ void declareStdVector(py::module& m, const std::string& suffix) {
348348
template <typename T, typename indices_type = size_t>
349349
void declarePartitionIvfIndex(py::module& m, const std::string& suffix) {
350350
m.def(("partition_ivf_index_" + suffix).c_str(),
351-
[](ColMajorMatrix<T>& centroids,
352-
ColMajorMatrix<float>& query,
351+
[](ColMajorMatrix<float>& centroids,
352+
ColMajorMatrix<T>& query,
353353
size_t nprobe,
354354
size_t nthreads) {
355355
return detail::ivf::partition_ivf_index(centroids, query, nprobe, nthreads);

apis/python/src/tiledb/vector_search/module.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,9 @@ def ivf_query(
295295

296296

297297
def partition_ivf_index(centroids, query, nprobe=1, nthreads=0):
298-
if centroids.dtype == np.float32:
298+
if query.dtype == np.float32:
299299
return partition_ivf_index_f32(centroids, query, nprobe, nthreads)
300-
elif centroids.dtype == np.uint8:
300+
elif query.dtype == np.uint8:
301301
return partition_ivf_index_u8(centroids, query, nprobe, nthreads)
302302
else:
303303
raise TypeError("Unsupported type!")
@@ -307,9 +307,9 @@ def dist_qv(
307307
parts_uri: str,
308308
ids_uri: str,
309309
query_vectors: "colMajorMatrix",
310-
active_partitions: "Vector",
311-
active_queries: "Vector",
312-
indices: "Vector",
310+
active_partitions: np.array,
311+
active_queries: np.array,
312+
indices: np.array,
313313
k_nn: int,
314314
ctx: "Ctx" = None):
315315
if ctx is None:
@@ -321,7 +321,7 @@ def dist_qv(
321321
active_partitions,
322322
query_vectors,
323323
active_queries,
324-
indices,
324+
StdVector_u64(indices),
325325
ids_uri,
326326
k_nn
327327
]

apis/python/test/test_ingestion.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from tiledb.vector_search.ingestion import ingest
44
from tiledb.vector_search.index import IVFFlatIndex
5+
from tiledb.cloud.dag import Mode
56

67
MINIMUM_ACCURACY = 0.9
78

@@ -52,7 +53,10 @@ def test_ivf_flat_ingestion_u8(tmp_path):
5253
k = 10
5354
size = 100000
5455
partitions = 100
55-
create_random_dataset_u8(nb=size, d=100, nq=10, k=k, path=dataset_dir)
56+
dimensions = 128
57+
nqueries = 100
58+
nprobe = 20
59+
create_random_dataset_u8(nb=size, d=dimensions, nq=nqueries, k=k, path=dataset_dir)
5660
source_type = "U8BIN"
5761
dtype = np.uint8
5862

@@ -67,33 +71,40 @@ def test_ivf_flat_ingestion_u8(tmp_path):
6771
input_vectors_per_work_item=int(size / 10),
6872
)
6973
result = np.transpose(
70-
index.query(np.transpose(query_vectors), k=k, nprobe=10)
74+
index.query(np.transpose(query_vectors), k=k, nprobe=nprobe)
7175
)
7276
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
7377

7478
index_ram = IVFFlatIndex(uri=array_uri, dtype=dtype, memory_budget=int(size / 10))
7579
result = np.transpose(
76-
index_ram.query(np.transpose(query_vectors), k=k, nprobe=partitions)
80+
index_ram.query(np.transpose(query_vectors), k=k, nprobe=nprobe)
7781
)
7882
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
7983
result = np.transpose(
8084
index_ram.query(
8185
np.transpose(query_vectors),
8286
k=k,
83-
nprobe=partitions,
87+
nprobe=nprobe,
8488
use_nuv_implementation=True,
8589
)
8690
)
8791
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
8892

93+
result = index_ram.distributed_query(np.transpose(query_vectors.astype(np.uint8)), k=k, nprobe=nprobe, mode=Mode.LOCAL)
94+
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
8995

9096
def test_ivf_flat_ingestion_f32(tmp_path):
97+
import time
9198
dataset_dir = os.path.join(tmp_path, "dataset")
9299
array_uri = os.path.join(tmp_path, "array")
93100
k = 10
94101
size = 100000
102+
dimensions = 128
95103
partitions = 100
96-
create_random_dataset_f32(nb=size, d=100, nq=10, k=k, path=dataset_dir)
104+
nqueries = 100
105+
nprobe = 20
106+
107+
create_random_dataset_f32(nb=size, d=dimensions, nq=nqueries, k=k, path=dataset_dir)
97108
source_type = "F32BIN"
98109
dtype = np.float32
99110

@@ -109,29 +120,29 @@ def test_ivf_flat_ingestion_f32(tmp_path):
109120
input_vectors_per_work_item=int(size / 10),
110121
)
111122

112-
result = index.distributed_query(np.transpose(query_vectors), k=k, nprobe=partitions)
113-
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
114-
115123
result = np.transpose(
116-
index.query(np.transpose(query_vectors), k=k, nprobe=partitions)
124+
index.query(np.transpose(query_vectors), k=k, nprobe=nprobe)
117125
)
118126
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
119127

120128
index_ram = IVFFlatIndex(uri=array_uri, dtype=dtype, memory_budget=int(size / 10))
121129
result = np.transpose(
122-
index_ram.query(np.transpose(query_vectors), k=k, nprobe=partitions)
130+
index_ram.query(np.transpose(query_vectors), k=k, nprobe=nprobe)
123131
)
124132
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
125133
result = np.transpose(
126134
index_ram.query(
127135
np.transpose(query_vectors),
128136
k=k,
129-
nprobe=partitions,
137+
nprobe=nprobe,
130138
use_nuv_implementation=True,
131139
)
132140
)
133141
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
134142

143+
result = index_ram.distributed_query(np.transpose(query_vectors), k=k, nprobe=nprobe, mode=Mode.LOCAL)
144+
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
145+
135146

136147

137148
def test_ivf_flat_ingestion_fvec(tmp_path):
@@ -143,8 +154,9 @@ def test_ivf_flat_ingestion_fvec(tmp_path):
143154
array_uri = os.path.join(tmp_path, "array")
144155
k = 100
145156
dimensions = 128
146-
partitions = 1000
157+
partitions = 100
147158
nqueries = 100
159+
nprobe = 20
148160

149161
query_vectors = get_queries_fvec(
150162
queries_uri, dimensions=dimensions, nqueries=nqueries
@@ -159,21 +171,24 @@ def test_ivf_flat_ingestion_fvec(tmp_path):
159171
partitions=partitions,
160172
)
161173
result = np.transpose(
162-
index.query(np.transpose(query_vectors), k=k, nprobe=partitions)
174+
index.query(np.transpose(query_vectors), k=k, nprobe=nprobe)
163175
)
164176
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
165177

166178
index_ram = IVFFlatIndex(uri=array_uri, dtype=dtype)
167179
result = np.transpose(
168-
index_ram.query(np.transpose(query_vectors), k=k, nprobe=partitions)
180+
index_ram.query(np.transpose(query_vectors), k=k, nprobe=nprobe)
169181
)
170182
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
171183
result = np.transpose(
172184
index_ram.query(
173185
np.transpose(query_vectors),
174186
k=k,
175-
nprobe=partitions,
187+
nprobe=nprobe,
176188
use_nuv_implementation=True,
177189
)
178190
)
179191
assert accuracy(result, gt_i) > MINIMUM_ACCURACY
192+
193+
result = index_ram.distributed_query(np.transpose(query_vectors), k=k, nprobe=nprobe, mode=Mode.LOCAL)
194+
assert accuracy(result, gt_i) > MINIMUM_ACCURACY

0 commit comments

Comments
 (0)