Skip to content

Commit 37f2f9d

Browse files
authored
Update IVF_PQ to use temp directory pattern to support parallel ingestion (#554)
1 parent eaf1a5f commit 37f2f9d

File tree

7 files changed

+195
-135
lines changed

7 files changed

+195
-135
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -317,16 +317,11 @@ def ingest(
317317
EXTERNAL_IDS_ARRAY_NAME = storage_formats[storage_version][
318318
"EXTERNAL_IDS_ARRAY_NAME"
319319
]
320-
if index_type == "IVF_PQ":
321-
PARTIAL_WRITE_ARRAY_DIR = storage_formats[storage_version][
322-
"PARTIAL_WRITE_ARRAY_DIR"
323-
]
324-
else:
325-
PARTIAL_WRITE_ARRAY_DIR = (
326-
storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"]
327-
+ "_"
328-
+ "".join(random.choices(string.ascii_letters, k=10))
329-
)
320+
PARTIAL_WRITE_ARRAY_DIR = (
321+
storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"]
322+
+ "_"
323+
+ "".join(random.choices(string.ascii_letters, k=10))
324+
)
330325
DEFAULT_ATTR_FILTERS = storage_formats[storage_version]["DEFAULT_ATTR_FILTERS"]
331326

332327
# This is used to auto-configure `input_vectors_per_work_item`
@@ -603,13 +598,26 @@ def create_temp_data_group(
603598
group: tiledb.Group,
604599
) -> tiledb.Group:
605600
partial_write_array_dir_uri = f"{group.uri}/{PARTIAL_WRITE_ARRAY_DIR}"
606-
try:
607-
tiledb.group_create(partial_write_array_dir_uri)
608-
add_to_group(group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR)
609-
except tiledb.TileDBError as err:
610-
message = str(err)
611-
if "already exists" not in message:
612-
raise err
601+
if index_type == "IVF_PQ":
602+
ctx = vspy.Ctx(config)
603+
index = vspy.IndexIVFPQ(
604+
ctx,
605+
index_group_uri,
606+
vspy.IndexLoadStrategy.PQ_INDEX,
607+
0,
608+
to_temporal_policy(index_timestamp),
609+
)
610+
index.create_temp_data_group(PARTIAL_WRITE_ARRAY_DIR)
611+
else:
612+
try:
613+
tiledb.group_create(partial_write_array_dir_uri)
614+
add_to_group(
615+
group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR
616+
)
617+
except tiledb.TileDBError as err:
618+
message = str(err)
619+
if "already exists" not in message:
620+
raise err
613621
return tiledb.Group(partial_write_array_dir_uri, "w")
614622

615623
def create_partial_write_array_group(
@@ -779,16 +787,6 @@ def create_arrays(
779787
create_index_array=True,
780788
asset_creation_threads=asset_creation_threads,
781789
)
782-
elif index_type == "IVF_PQ":
783-
ctx = vspy.Ctx(config)
784-
index = vspy.IndexIVFPQ(
785-
ctx,
786-
index_group_uri,
787-
vspy.IndexLoadStrategy.PQ_INDEX,
788-
0,
789-
to_temporal_policy(index_timestamp),
790-
)
791-
index.create_temp_data_group()
792790
# Note that we don't create type-erased indexes (i.e. Vamana) here. Instead we create them
793791
# at very start of ingest() in C++.
794792
elif not is_type_erased_index(index_type):
@@ -1294,9 +1292,13 @@ def ivf_pq_train_udf(
12941292
else np.empty((0, dimensions), dtype=vector_type)
12951293
)
12961294

1297-
# Filter out the updated vectors from the sample vectors.
1295+
# NOTE: We add kind='sort' as a workaround to this bug: https://github.com/numpy/numpy/issues/26922
12981296
updates_filter = np.in1d(
1299-
external_ids, updated_ids, assume_unique=True, invert=True
1297+
external_ids,
1298+
updated_ids,
1299+
assume_unique=True,
1300+
invert=True,
1301+
kind="sort",
13001302
)
13011303
sample_vectors = sample_vectors[updates_filter]
13021304

@@ -1922,6 +1924,7 @@ def ingest_vectors_udf(
19221924
start=part,
19231925
end=part_end,
19241926
partition_start=part_id * (partitions + 1),
1927+
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
19251928
)
19261929
else:
19271930
ivf_index_tdb(
@@ -1979,6 +1982,7 @@ def ingest_vectors_udf(
19791982
start=part,
19801983
end=part_end,
19811984
partition_start=part_id * (partitions + 1),
1985+
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
19821986
)
19831987
else:
19841988
ivf_index(
@@ -2069,6 +2073,7 @@ def ingest_additions_udf(
20692073
start=write_offset,
20702074
end=0,
20712075
partition_start=partition_start,
2076+
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
20722077
)
20732078
else:
20742079
ivf_index(
@@ -2161,7 +2166,12 @@ def ivf_pq_consolidate_partition_udf(
21612166
to_temporal_policy(index_timestamp),
21622167
)
21632168
index.consolidate_partitions(
2164-
partitions, work_items, partition_id_start, partition_id_end, batch
2169+
partitions=partitions,
2170+
work_items=work_items,
2171+
partition_id_start=partition_id_start,
2172+
partition_id_end=partition_id_end,
2173+
batch=batch,
2174+
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
21652175
)
21662176

21672177
def consolidate_partition_udf(

apis/python/src/tiledb/vector_search/type_erased_module.cc

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,10 @@ void init_type_erased_module(py::module_& m) {
542542
py::arg("temporal_policy") = std::nullopt)
543543
.def(
544544
"create_temp_data_group",
545-
[](IndexIVFPQ& index) { index.create_temp_data_group(); })
545+
[](IndexIVFPQ& index, const std::string& partial_write_array_dir) {
546+
index.create_temp_data_group(partial_write_array_dir);
547+
},
548+
py::arg("partial_write_array_dir"))
546549
.def(
547550
"train",
548551
[](IndexIVFPQ& index,
@@ -577,41 +580,47 @@ void init_type_erased_module(py::module_& m) {
577580
const FeatureVector& deleted_ids,
578581
size_t start,
579582
size_t end,
580-
size_t partition_start) {
583+
size_t partition_start,
584+
const std::string& partial_write_array_dir) {
581585
index.ingest_parts(
582586
input_vectors,
583587
external_ids,
584588
deleted_ids,
585589
start,
586590
end,
587-
partition_start);
591+
partition_start,
592+
partial_write_array_dir);
588593
},
589594
py::arg("input_vectors"),
590595
py::arg("external_ids"),
591596
py::arg("deleted_ids"),
592597
py::arg("start"),
593598
py::arg("end"),
594-
py::arg("partition_start"))
599+
py::arg("partition_start"),
600+
py::arg("partial_write_array_dir"))
595601
.def(
596602
"consolidate_partitions",
597603
[](IndexIVFPQ& index,
598604
size_t partitions,
599605
size_t work_items,
600606
size_t partition_id_start,
601607
size_t partition_id_end,
602-
size_t batch) {
608+
size_t batch,
609+
const std::string& partial_write_array_dir) {
603610
index.consolidate_partitions(
604611
partitions,
605612
work_items,
606613
partition_id_start,
607614
partition_id_end,
608-
batch);
615+
batch,
616+
partial_write_array_dir);
609617
},
610618
py::arg("partitions"),
611619
py::arg("work_items"),
612620
py::arg("partition_id_start"),
613621
py::arg("partition_id_end"),
614-
py::arg("batch"))
622+
py::arg("batch"),
623+
py::arg("partial_write_array_dir"))
615624
.def(
616625
"ingest",
617626
[](IndexIVFPQ& index, const FeatureVectorArray& input_vectors) {

src/include/api/ivf_pq_index.h

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,12 @@ class IndexIVFPQ {
172172
dimensions_ = index_->dimensions();
173173
}
174174

175-
void create_temp_data_group() {
175+
void create_temp_data_group(const std::string& partial_write_array_dir) {
176176
if (!index_) {
177177
throw std::runtime_error(
178178
"Cannot create_temp_data_group() because there is no index.");
179179
}
180-
index_->create_temp_data_group();
180+
index_->create_temp_data_group(partial_write_array_dir);
181181
}
182182

183183
/**
@@ -231,7 +231,8 @@ class IndexIVFPQ {
231231
const FeatureVector& deleted_ids,
232232
size_t start,
233233
size_t end,
234-
size_t partition_start) {
234+
size_t partition_start,
235+
const std::string& partial_write_array_dir) {
235236
if (feature_datatype_ != input_vectors.feature_type()) {
236237
throw std::runtime_error(
237238
"[ivf_pq_index@ingest_parts] Feature datatype mismatch: " +
@@ -243,7 +244,13 @@ class IndexIVFPQ {
243244
"Cannot ingest_parts() because there is no index.");
244245
}
245246
index_->ingest_parts(
246-
input_vectors, external_ids, deleted_ids, start, end, partition_start);
247+
input_vectors,
248+
external_ids,
249+
deleted_ids,
250+
start,
251+
end,
252+
partition_start,
253+
partial_write_array_dir);
247254
}
248255

249256
void ingest(
@@ -266,14 +273,20 @@ class IndexIVFPQ {
266273
size_t work_items,
267274
size_t partition_id_start,
268275
size_t partition_id_end,
269-
size_t batch) {
276+
size_t batch,
277+
const std::string& partial_write_array_dir) {
270278
if (!index_) {
271279
throw std::runtime_error(
272280
"[ivf_pq_index@consolidate_partitions] Cannot "
273281
"consolidate_partitions() because there is no index.");
274282
}
275283
index_->consolidate_partitions(
276-
partitions, work_items, partition_id_start, partition_id_end, batch);
284+
partitions,
285+
work_items,
286+
partition_id_start,
287+
partition_id_end,
288+
batch,
289+
partial_write_array_dir);
277290
}
278291

279292
[[nodiscard]] auto query(
@@ -413,7 +426,8 @@ class IndexIVFPQ {
413426
struct index_base {
414427
virtual ~index_base() = default;
415428

416-
virtual void create_temp_data_group() = 0;
429+
virtual void create_temp_data_group(
430+
const std::string& partial_write_array_dir) = 0;
417431

418432
virtual void train(
419433
const FeatureVectorArray& training_set,
@@ -426,7 +440,8 @@ class IndexIVFPQ {
426440
const FeatureVector& deleted_ids,
427441
size_t start,
428442
size_t end,
429-
size_t partition_start) = 0;
443+
size_t partition_start,
444+
const std::string& partial_write_array_dir) = 0;
430445

431446
virtual void ingest(
432447
const FeatureVectorArray& input_vectors,
@@ -437,7 +452,8 @@ class IndexIVFPQ {
437452
size_t work_items,
438453
size_t partition_id_start,
439454
size_t partition_id_end,
440-
size_t batch) = 0;
455+
size_t batch,
456+
const std::string& partial_write_array_dir) = 0;
441457

442458
[[nodiscard]] virtual std::tuple<FeatureVectorArray, FeatureVectorArray>
443459
query(
@@ -499,8 +515,9 @@ class IndexIVFPQ {
499515
temporal_policy) {
500516
}
501517

502-
void create_temp_data_group() override {
503-
impl_index_.create_temp_data_group();
518+
void create_temp_data_group(
519+
const std::string& partial_write_array_dir) override {
520+
impl_index_.create_temp_data_group(partial_write_array_dir);
504521
}
505522

506523
void train(
@@ -521,7 +538,8 @@ class IndexIVFPQ {
521538
const FeatureVector& deleted_ids,
522539
size_t start,
523540
size_t end,
524-
size_t partition_start) override {
541+
size_t partition_start,
542+
const std::string& partial_write_array_dir) override {
525543
using feature_type = typename T::feature_type;
526544
using id_type = typename T::id_type;
527545
auto fspan = MatrixView<feature_type, stdx::layout_left>{
@@ -534,7 +552,13 @@ class IndexIVFPQ {
534552
auto ids = std::vector<id_type>(::num_vectors(input_vectors));
535553
std::iota(ids.begin(), ids.end(), start);
536554
impl_index_.ingest_parts(
537-
fspan, ids, deleted_ids_span, start, end, partition_start);
555+
fspan,
556+
ids,
557+
deleted_ids_span,
558+
start,
559+
end,
560+
partition_start,
561+
partial_write_array_dir);
538562
} else {
539563
auto external_ids_span = std::span<id_type>(
540564
(id_type*)external_ids.data(), external_ids.dimensions());
@@ -544,7 +568,8 @@ class IndexIVFPQ {
544568
deleted_ids_span,
545569
start,
546570
end,
547-
partition_start);
571+
partition_start,
572+
partial_write_array_dir);
548573
}
549574
}
550575

@@ -573,9 +598,15 @@ class IndexIVFPQ {
573598
size_t work_items,
574599
size_t partition_id_start,
575600
size_t partition_id_end,
576-
size_t batch) override {
601+
size_t batch,
602+
const std::string& partial_write_array_dir) override {
577603
impl_index_.consolidate_partitions(
578-
partitions, work_items, partition_id_start, partition_id_end, batch);
604+
partitions,
605+
work_items,
606+
partition_id_start,
607+
partition_id_end,
608+
batch,
609+
partial_write_array_dir);
579610
}
580611

581612
/**

0 commit comments

Comments
 (0)