Skip to content

Commit 8685766

Browse files
authored
New interfaces in support of distributed execution (#68)
This PR includes some changes and refactoring to better support distributed execution of TileDB vector search. * Separated out functionality for sorting query vectors according to the partitions that where they will be searching for nearest neighbors into `partition_ivf_index()` * Updated `qv_query_heap_finite_ram()` to use `partition_ivf_index` * Updated `qv_query_heap_infinite_ram()` to use `partition_ivf_index` (Currently the file `qv.h` still contains the OG implementations of `qv_query_heap_*finite_ram()` in order to test and debug the new versions as needed. These will be removed in a future PR once we are satisfied the new implementation is correct and performs as well as the OG.) * Added `initializer_list` constructor for `Matrix` * Greatly simplified the interfaces and implementations of `tdbMatrix` and `tdbPartitionedMatrix`. Both classes now do not load any data when they are opened. Rather, the `load()` member function must be invoked to load the data. For the `tdbPartitionedMatrix`, this implements out of core operation -- subsequent calls to `load()` will load the next chunk of memory into the `tdbPartitionedMatrix` * Added more fine-grained timers to various functions in the C++ library * Fixed `flat.cc` so that all "flat" queries are correctly called * Fixed `sgemm` computations to handle `uint8_t` vector arrays (by copying the `uint8_t` array into a `float` array) * Various fixes to eliminate numerous warning emitted by g++-12
1 parent edee144 commit 8685766

File tree

27 files changed

+1154
-727
lines changed

27 files changed

+1154
-727
lines changed

apis/python/src/tiledb/vector_search/module.cc

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ static void declareColMajorMatrixSubclass(py::module& mod,
187187
// TODO auto-namify
188188
PyTMatrix cls(mod, (name + suffix).c_str(), py::buffer_protocol());
189189
cls.def(py::init<const Ctx&, std::string, size_t>(), py::keep_alive<1,2>());
190+
191+
if constexpr (std::is_same<P, tdbColMajorMatrix<T>>::value) {
192+
cls.def("load", &TMatrix::load);
193+
}
190194
}
191195

192196
template <typename T>
@@ -277,22 +281,22 @@ PYBIND11_MODULE(_tiledbvspy, m) {
277281
/* Query API */
278282

279283
m.def("query_vq_f32",
280-
[](const ColMajorMatrix<float>& data,
281-
const ColMajorMatrix<float>& query_vectors,
284+
[](ColMajorMatrix<float>& data,
285+
ColMajorMatrix<float>& query_vectors,
282286
int k,
283287
bool nth,
284-
size_t nthreads) -> ColMajorMatrix<uint64_t> {
285-
auto r = detail::flat::vq_query_heap(data, query_vectors, k, nthreads);
288+
size_t nthreads) -> ColMajorMatrix<size_t> {
289+
auto r = detail::flat::vq_query_nth(data, query_vectors, k, true, nthreads);
286290
return r;
287291
});
288292

289293
m.def("query_vq_u8",
290-
[](const ColMajorMatrix<uint8_t>& data,
291-
const ColMajorMatrix<float>& query_vectors,
294+
[](tdbColMajorMatrix<uint8_t>& data,
295+
ColMajorMatrix<float>& query_vectors,
292296
int k,
293297
bool nth,
294-
size_t nthreads) -> ColMajorMatrix<uint64_t> {
295-
auto r = detail::flat::vq_query_heap(data, query_vectors, k, nthreads);
298+
size_t nthreads) -> ColMajorMatrix<size_t> {
299+
auto r = detail::flat::vq_query_nth(data, query_vectors, k, true, nthreads);
296300
return r;
297301
});
298302

apis/python/src/tiledb/vector_search/module.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,21 @@ def load_as_matrix(path: str, nqueries: int = 0, config: Dict = {}):
2424
a = tiledb.ArraySchema.load(path)
2525
dtype = a.attr(0).dtype
2626
if dtype == np.float32:
27-
return tdbColMajorMatrix_f32(ctx, path, nqueries)
27+
m = tdbColMajorMatrix_f32(ctx, path, nqueries)
2828
elif dtype == np.float64:
29-
return tdbColMajorMatrix_f64(ctx, path, nqueries)
29+
m = tdbColMajorMatrix_f64(ctx, path, nqueries)
3030
elif dtype == np.int32:
31-
return tdbColMajorMatrix_i32(ctx, path, nqueries)
31+
m = tdbColMajorMatrix_i32(ctx, path, nqueries)
3232
elif dtype == np.int32:
33-
return tdbColMajorMatrix_i64(ctx, path, nqueries)
33+
m = tdbColMajorMatrix_i64(ctx, path, nqueries)
3434
elif dtype == np.uint8:
35-
return tdbColMajorMatrix_u8(ctx, path, nqueries)
35+
m = tdbColMajorMatrix_u8(ctx, path, nqueries)
3636
# elif dtype == np.uint64:
3737
# return tdbColMajorMatrix_u64(ctx, path, nqueries)
3838
else:
3939
raise ValueError("Unsupported Matrix dtype: {}".format(a.attr(0).dtype))
40+
m.load()
41+
return m
4042

4143

4244
def load_as_array(path, return_matrix: bool = False, config: Dict = {}):

apis/python/test/test_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_flat_query():
5050

5151
r = vs.query_vq(db, targets, k, nqueries, 8) # k # nqueries # nthreads
5252

53-
ra = np.array(r, copy=False)
53+
ra = np.array(r, copy=True)
5454
print(ra)
5555
print(ra.shape)
5656

apis/python/test/test_module.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def test_tdbMatrix(tmpdir):
1313

1414
ctx = vspy.Ctx({})
1515
m = vspy.tdbColMajorMatrix_f32(ctx, p, 0)
16+
m.load()
1617
m_array = np.array(m)
1718
assert m_array.shape == data.shape
1819
assert np.array_equal(m_array, data)

src/benchmarks/setup.bash

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#!/bin/bash
22

3-
ec2_ivf_flat="/home/lums/feature-vector-prototype/src/cmake-build-release/src/ivf_flat"
4-
m1_ivf_flat="/Users/lums/TileDB/feature-vector-prototype/src/cmake-build-release/src/ivf_flat"
5-
ec2_flat="/home/lums/feature-vector-prototype/src/cmake-build-release/src/flat"
6-
m1_flat="/Users/lums/TileDB/feature-vector-prototype/src/cmake-build-release/src/flat"
3+
ec2_ivf_flat="/home/lums/TileDB-Vector-Search/src/cmake-build-release/libtiledbvectorsearch/src/ivf_flat"
4+
m1_ivf_flat="/Users/lums/TileDB/TileDB-Vector-Search/src/cmake-build-release/src/ivf_flat"
5+
ec2_flat="/home/lums/TileDB-Vector-Search/src/cmake-build-release/src/flat_l2"
6+
m1_flat="/Users/lums/TileDB/TileDB-Vector-Search/src/cmake-build-release/src/flat_l2"
7+
8+
79

810
if [ -f "${ivf_query}" ]; then
911
ivf_query="${ivf_query}"
@@ -25,11 +27,11 @@ else
2527
echo "Neither flat executable file exists"
2628
fi
2729

28-
# gp3_root=/home/lums/feature-vector-prototype/external/data/gp3
30+
# gp3_root=/home/lums/TileDB-Vector-Search/external/data/gp3
2931
nvme_root=/mnt/ssd
3032

31-
ec2_root="/home/lums/feature-vector-prototype/external/data/gp3"
32-
m1_root="/Users/lums/TileDB/feature-vector-prototype/external/data/gp3"
33+
ec2_root="/home/lums/TileDB-Vector-Search/external/data/gp3"
34+
m1_root="/Users/lums/TileDB/TileDB-Vector-Search/external/data/gp3"
3335

3436
if [ -d "${gp3_root}" ]; then
3537
gp3_root=${gp3_root}
@@ -418,7 +420,7 @@ function ivf_query() {
418420
shift 2
419421
;;
420422
--cluster|--nprobe)
421-
local _cluster="--cluster ${2}"
423+
local _cluster="--nprobe ${2}"
422424
shift 2
423425
;;
424426
--block|--blocksize)

src/cmake/.Superbuild.cmake.swo

4 KB
Binary file not shown.

src/include/defs.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ auto mat_col_sum(
137137
auto num_cols = m.num_cols();
138138
auto num_rows = m.num_rows();
139139

140-
for (int j = 0; j < num_cols; ++j) {
140+
for (size_t j = 0; j < num_cols; ++j) {
141141
decltype(v[0]) vj = v[j];
142-
for (int i = 0; i < num_rows; ++i) {
142+
for (size_t i = 0; i < num_rows; ++i) {
143143
vj += f(m(i, j));
144144
}
145145
v[j] = vj;
@@ -238,7 +238,7 @@ auto get_top_k(const S& scores, int k, bool nth, int nthreads) {
238238

239239
auto num_queries = scores.num_cols();
240240

241-
auto top_k = ColMajorMatrix<uint64_t>(k, num_queries);
241+
auto top_k = ColMajorMatrix<size_t>(k, num_queries);
242242

243243
int q_block_size = (num_queries + nthreads - 1) / nthreads;
244244
std::vector<std::future<void>> futs;

src/include/detail/flat/gemm.h

Lines changed: 10 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -53,64 +53,29 @@ using namespace std::chrono_literals;
5353

5454
template <class DB, class Q>
5555
auto blocked_gemm_query(DB& db, Q& q, int k, bool nth, size_t nthreads) {
56-
scoped_timer _{"Total time " + tdb_func__};
56+
scoped_timer _{tdb_func__};
5757

5858
using element = std::pair<float, unsigned>;
5959

60-
// @todo constexpr block_db and block_q
61-
auto block_db = db.is_blocked();
62-
auto block_q = q.is_blocked();
63-
auto async_db = block_db && db.is_async();
64-
auto async_q = block_q && q.is_async();
65-
if (block_db && block_q) {
66-
throw std::runtime_error("Can't block both db and q");
67-
}
68-
6960
ColMajorMatrix<float> scores(db.num_cols(), q.num_cols());
7061

7162
std::vector<fixed_min_heap<element>> min_scores(
7263
size(q), fixed_min_heap<element>(k));
7364

74-
for (;;) {
75-
if (async_db) {
76-
db.advance_async();
77-
}
78-
if (async_q) {
79-
q.advance_async();
80-
}
65+
while (db.load()) {
8166
gemm_scores(db, q, scores, nthreads);
8267

8368
auto par = stdx::execution::indexed_parallel_policy{nthreads};
8469
stdx::range_for_each(
8570
std::move(par), scores, [&](auto&& q_vec, auto&& n = 0, auto&& i = 0) {
86-
if (block_db) {
87-
for (int j = 0; j < scores.num_rows(); ++j) {
88-
min_scores[i].insert({scores(j, i), j + db.offset()});
89-
}
90-
} else if (block_q) {
91-
for (int j = 0; j < scores.num_rows(); ++j) {
92-
min_scores[i + q.offset()].insert({scores(j, i), j});
93-
}
94-
} else {
95-
for (int j = 0; j < scores.num_rows(); ++j) {
96-
min_scores[i].insert({scores(j, i), j});
97-
}
71+
for (size_t j = 0; j < scores.num_rows(); ++j) {
72+
min_scores[i].insert({scores(j, i), j + db.col_offset()});
9873
}
9974
});
100-
101-
bool done = true;
102-
if (block_db) {
103-
done = async_db ? !db.advance_wait() : !db.advance();
104-
} else if (block_q) {
105-
done = async_q ? !q.advance_wait() : !q.advance();
106-
}
107-
if (done) {
108-
break;
109-
}
11075
}
11176

11277
ColMajorMatrix<size_t> top_k(k, q.num_cols());
113-
for (int j = 0; j < min_scores.size(); ++j) {
78+
for (size_t j = 0; j < size(min_scores); ++j) {
11479
// @todo get_top_k_from_heap
11580
std::sort_heap(min_scores[j].begin(), min_scores[j].end());
11681
std::transform(
@@ -125,11 +90,11 @@ auto blocked_gemm_query(DB& db, Q& q, int k, bool nth, size_t nthreads) {
12590

12691
template <class DB, class Q>
12792
auto gemm_partition(const DB& db, const Q& q, unsigned nthreads) {
128-
scoped_timer _{"Total time " + tdb_func__};
93+
scoped_timer _{tdb_func__};
12994

13095
auto scores = gemm_scores(db, q, nthreads);
13196

132-
auto top_k = std::vector<int>(q.num_cols());
97+
auto top_k = std::vector<size_t>(q.num_cols());
13398
{
13499
for (int i = 0; i < scores.num_cols(); ++i) {
135100
auto min_score = std::numeric_limits<float>::max();
@@ -151,21 +116,15 @@ auto gemm_partition(const DB& db, const Q& q, unsigned nthreads) {
151116

152117
template <class DB, class Q>
153118
auto blocked_gemm_partition(DB& db, Q& q, unsigned nthreads) {
154-
scoped_timer _{"Total time " + tdb_func__};
155-
156-
const auto block_db = db.is_blocked();
157-
const auto block_q = q.is_blocked();
158-
if (block_db && block_q) {
159-
throw std::runtime_error("Can't block both db and q");
160-
}
119+
scoped_timer _{tdb_func__};
161120

162121
ColMajorMatrix<float> scores(db.num_cols(), q.num_cols());
163122
auto _score_data = raveled(scores);
164123
auto top_k = std::vector<int>(q.num_cols());
165124
auto min_scores =
166125
std::vector<float>(q.num_cols(), std::numeric_limits<float>::max());
167126

168-
for (;;) {
127+
while (db.load()) {
169128
gemm_scores(db, q, scores, nthreads);
170129

171130
for (int i = 0; i < scores.num_cols(); ++i) {
@@ -181,18 +140,9 @@ auto blocked_gemm_partition(DB& db, Q& q, unsigned nthreads) {
181140
}
182141
top_k[i] = idx;
183142
}
184-
bool done = true;
185-
if (block_db) {
186-
done = !db.advance();
187-
} else {
188-
done = !q.advance();
189-
}
190-
if (done) {
191-
break;
192-
}
193143
}
194144
return top_k;
195145
}
196146
} // namespace detail::flat
197147

198-
#endif // TILEDB_FLAT_GEMM_H
148+
#endif // TILEDB_FLAT_GEMM_H

src/include/detail/flat/qv.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ auto qv_query_nth(
6565
const DB& db, const Q& q, int k, bool nth, unsigned int nthreads) {
6666
scoped_timer _{tdb_func__};
6767

68-
ColMajorMatrix<uint64_t> top_k(k, q.num_cols());
68+
ColMajorMatrix<size_t> top_k(k, q.num_cols());
6969

7070
auto par = stdx::execution::indexed_parallel_policy{nthreads};
7171
stdx::range_for_each(
@@ -101,7 +101,7 @@ auto qv_query_heap(const DB& db, const Q& q, size_t k, unsigned nthreads) {
101101

102102
using element = std::pair<float, int>;
103103

104-
ColMajorMatrix<uint64_t> top_k(k, q.num_cols());
104+
ColMajorMatrix<size_t> top_k(k, q.num_cols());
105105

106106
// Have to do explicit asynchronous threading here, as the current parallel
107107
// algorithms have iterator-based interaces, and the `Matrix` class does not
@@ -156,7 +156,7 @@ auto qv_partition(const DB& db, const Q& q, unsigned nthreads) {
156156
scoped_timer _{tdb_func__};
157157

158158
// Just need a single vector
159-
std::vector<unsigned> top_k(q.num_cols());
159+
std::vector<size_t> top_k(q.num_cols());
160160

161161
// Again, doing the parallelization by hand here....
162162
size_t size_db = db.num_cols();
@@ -178,7 +178,7 @@ auto qv_partition(const DB& db, const Q& q, unsigned nthreads) {
178178
float min_score = std::numeric_limits<float>::max();
179179
size_t idx = 0;
180180

181-
for (int i = 0; i < size_db; ++i) {
181+
for (size_t i = 0; i < size_db; ++i) {
182182
auto score = L2(q[j], db[i]);
183183
if (score < min_score) {
184184
min_score = score;

0 commit comments

Comments
 (0)