Skip to content

Commit 00e1a21

Browse files
Enable OOC processing for IVF_FLAT distributed query execution (#418)
This applies OOC computation for IVF_FLAT distributed query execution by having a loop of `load()` calls and propagating `upper_bound` for `tdbPartitionedMatrix` This will allow us to avoid OOM errors in UDF workers by streaming the respective work partitions in configurable batches. We don't yet pass `upper_bound` from Python as we need to first release this change in cloud UDF images. Instead we set a default of 200k vectors. The default upper_bound of 200k is selected by assuming vectors with 1k dimensions each using 4 bytes. In this case, each load() operation would fetch at most 800MB of data.
1 parent 6530a06 commit 00e1a21

File tree

2 files changed

+79
-49
lines changed

2 files changed

+79
-49
lines changed

apis/python/test/test_ingestion.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1892,3 +1892,30 @@ def test_ivf_flat_ingestion_with_training_source_uri_numpy(tmp_path):
18921892
expected_result_d=[[0]],
18931893
expected_result_i=[[1003]],
18941894
)
1895+
1896+
1897+
def test_ivf_flat_taskgraph_query(tmp_path):
1898+
dataset_dir = os.path.join(tmp_path, "dataset")
1899+
index_uri = os.path.join(tmp_path, "array")
1900+
k = 10
1901+
size = 10000
1902+
partitions = 100
1903+
dimensions = 129
1904+
nqueries = 100
1905+
nprobe = 20
1906+
create_random_dataset_u8(nb=size, d=dimensions, nq=nqueries, k=k, path=dataset_dir)
1907+
dtype = np.uint8
1908+
1909+
queries = get_queries(dataset_dir, dtype=dtype)
1910+
gt_i, gt_d = get_groundtruth(dataset_dir, k)
1911+
index = ingest(
1912+
index_type="IVF_FLAT",
1913+
index_uri=index_uri,
1914+
source_uri=os.path.join(dataset_dir, "data.u8bin"),
1915+
partitions=partitions,
1916+
input_vectors_per_work_item=int(size / 10),
1917+
)
1918+
_, result = index._taskgraph_query(
1919+
queries, k=k, nprobe=nprobe, nthreads=8, mode=Mode.LOCAL, num_partitions=10
1920+
)
1921+
assert accuracy(result, gt_i) > MINIMUM_ACCURACY

src/include/detail/ivf/dist_qv.h

Lines changed: 52 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ auto dist_qv_finite_ram_part(
7878
const std::string& id_uri,
7979
size_t k_nn,
8080
uint64_t timestamp = 0,
81+
// The default upper_bound of 200k is selected by assuming vectors with
82+
// 1k dimensions each using 4 bytes.
83+
// In this case, each load() operation would fetch at most 800MB of data.
84+
size_t upper_bound = 200000,
8185
size_t nthreads = std::thread::hardware_concurrency(),
8286
Distance&& distance = Distance{}) {
8387
if (nthreads == 0) {
@@ -109,70 +113,68 @@ auto dist_qv_finite_ram_part(
109113
global_indices,
110114
id_uri,
111115
dist_active_partitions,
112-
0,
116+
upper_bound,
113117
temporal_policy);
114118

115-
// We are assuming that we are not doing out of core computation here.
116-
// (It is easy enough to change this if we need to.)
117-
partitioned_vectors.load();
118-
119119
scoped_timer _i{tdb_func__ + " in RAM"};
120120

121121
auto min_scores =
122122
std::vector<fixed_min_pair_heap<score_type, shuffled_ids_type>>(
123123
num_queries,
124124
fixed_min_pair_heap<score_type, shuffled_ids_type>(k_nn));
125125

126-
if (::num_partitions(partitioned_vectors) != size(dist_active_partitions)) {
127-
throw std::runtime_error(
128-
"[dist_qv_finite_ram_part] num_partitions(partitioned_vectors) != "
129-
"size(dist_active_partitions)");
130-
}
131-
132-
auto current_part_size = ::num_partitions(partitioned_vectors);
133-
size_t parts_per_thread = (current_part_size + nthreads - 1) / nthreads;
134-
135-
std::vector<std::future<decltype(min_scores)>> futs;
136-
futs.reserve(nthreads);
126+
size_t part_offset = 0;
127+
while (partitioned_vectors.load()) {
128+
_i.start();
129+
auto current_part_size = ::num_partitions(partitioned_vectors);
130+
size_t parts_per_thread = (current_part_size + nthreads - 1) / nthreads;
137131

138-
for (size_t n = 0; n < nthreads; ++n) {
139-
auto first_part = std::min<size_t>(n * parts_per_thread, current_part_size);
140-
auto last_part =
141-
std::min<size_t>((n + 1) * parts_per_thread, current_part_size);
132+
std::vector<std::future<decltype(min_scores)>> futs;
133+
futs.reserve(nthreads);
142134

143-
if (first_part != last_part) {
144-
futs.emplace_back(std::async(
145-
std::launch::async,
146-
[&query,
147-
&partitioned_vectors,
148-
&active_queries = dist_active_queries,
149-
&distance,
150-
k_nn,
151-
first_part,
152-
last_part]() {
153-
return apply_query(
154-
partitioned_vectors,
155-
std::optional<std::vector<int>>{},
156-
// std::optional{active_partitions},
157-
query,
158-
active_queries,
159-
k_nn,
160-
first_part,
161-
last_part,
162-
0,
163-
distance);
164-
}));
135+
for (size_t n = 0; n < nthreads; ++n) {
136+
auto first_part =
137+
std::min<size_t>(n * parts_per_thread, current_part_size);
138+
auto last_part =
139+
std::min<size_t>((n + 1) * parts_per_thread, current_part_size);
140+
141+
if (first_part != last_part) {
142+
futs.emplace_back(std::async(
143+
std::launch::async,
144+
[&query,
145+
&partitioned_vectors,
146+
&active_queries = dist_active_queries,
147+
&distance,
148+
k_nn,
149+
first_part,
150+
last_part,
151+
part_offset]() {
152+
return apply_query(
153+
partitioned_vectors,
154+
std::optional<std::vector<int>>{},
155+
// std::optional{dist_active_partitions},
156+
query,
157+
active_queries,
158+
k_nn,
159+
first_part,
160+
last_part,
161+
part_offset,
162+
distance);
163+
}));
164+
}
165165
}
166-
}
166+
for (size_t n = 0; n < size(futs); ++n) {
167+
auto min_n = futs[n].get();
167168

168-
for (size_t n = 0; n < size(futs); ++n) {
169-
auto min_n = futs[n].get();
170-
171-
for (size_t j = 0; j < num_queries; ++j) {
172-
for (auto&& [e, f] : min_n[j]) {
173-
min_scores[j].insert(e, f);
169+
for (size_t j = 0; j < num_queries; ++j) {
170+
for (auto&& [e, f] : min_n[j]) {
171+
min_scores[j].insert(e, f);
172+
}
174173
}
175174
}
175+
176+
part_offset += current_part_size;
177+
_i.stop();
176178
}
177179
return min_scores;
178180
}
@@ -334,6 +336,7 @@ auto dist_qv_finite_ram(
334336
id_uri,
335337
k_nn,
336338
timestamp,
339+
upper_bound,
337340
nthreads,
338341
distance);
339342
#else

0 commit comments

Comments
 (0)