Skip to content

Commit 1756645

Browse files
committed
Rename array_uri -> index_uri, and clarify internal variable names
1 parent 1b568f2 commit 1756645

File tree

5 files changed

+77
-68
lines changed

5 files changed

+77
-68
lines changed

apis/python/src/tiledb/vector_search/index.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def __init__(
134134
self.centroids_uri = group[
135135
storage_formats[self.storage_version]["CENTROIDS_ARRAY_NAME"]
136136
].uri
137-
self.index_uri = group[
137+
self.index_array_uri = group[
138138
storage_formats[self.storage_version]["INDEX_ARRAY_NAME"]
139139
].uri
140140
self.ids_uri = group[
@@ -145,7 +145,7 @@ def __init__(
145145
self._centroids = load_as_matrix(
146146
self.centroids_uri, ctx=self.ctx, config=config
147147
)
148-
self._index = read_vector_u64(self.ctx, self.index_uri)
148+
self._index = read_vector_u64(self.ctx, self.index_array_uri)
149149

150150
# TODO pass in a context
151151
if self.memory_budget == -1:

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
def ingest(
99
index_type: str,
10-
array_uri: str,
10+
index_uri: str,
1111
source_uri: str,
1212
source_type: str,
1313
*,
@@ -30,8 +30,8 @@ def ingest(
3030
----------
3131
index_type: str
3232
Type of vector index (FLAT, IVF_FLAT)
33-
array_uri: str
34-
Vector array URI
33+
index_uri: str
34+
Vector index URI (stored as TileDB group)
3535
source_uri: str
3636
Data source URI
3737
source_type: str
@@ -81,6 +81,9 @@ def ingest(
8181
from tiledb.cloud.utilities import set_aws_context
8282
from tiledb.vector_search.storage_formats import storage_formats, STORAGE_VERSION
8383

84+
# use index_group_uri for internal clarity
85+
index_group_uri = index_uri
86+
8487
CENTROIDS_ARRAY_NAME = storage_formats[STORAGE_VERSION]["CENTROIDS_ARRAY_NAME"]
8588
INDEX_ARRAY_NAME = storage_formats[STORAGE_VERSION]["INDEX_ARRAY_NAME"]
8689
IDS_ARRAY_NAME = storage_formats[STORAGE_VERSION]["IDS_ARRAY_NAME"]
@@ -232,7 +235,7 @@ def create_arrays(
232235

233236
elif index_type == "IVF_FLAT":
234237
centroids_uri = f"{group.uri}/{CENTROIDS_ARRAY_NAME}"
235-
index_uri = f"{group.uri}/{INDEX_ARRAY_NAME}"
238+
index_array_uri = f"{group.uri}/{INDEX_ARRAY_NAME}"
236239
ids_uri = f"{group.uri}/{IDS_ARRAY_NAME}"
237240
parts_uri = f"{group.uri}/{PARTS_ARRAY_NAME}"
238241
partial_write_array_dir_uri = f"{group.uri}/{PARTIAL_WRITE_ARRAY_DIR}"
@@ -280,7 +283,7 @@ def create_arrays(
280283
tiledb.Array.create(centroids_uri, centroids_schema)
281284
group.add(centroids_uri, name=CENTROIDS_ARRAY_NAME)
282285

283-
if not tiledb.array_exists(index_uri):
286+
if not tiledb.array_exists(index_array_uri):
284287
logger.debug("Creating index array")
285288
index_array_rows_dim = tiledb.Dim(
286289
name="rows",
@@ -303,8 +306,8 @@ def create_arrays(
303306
tile_order="col-major",
304307
)
305308
logger.debug(index_schema)
306-
tiledb.Array.create(index_uri, index_schema)
307-
group.add(index_uri, name=INDEX_ARRAY_NAME)
309+
tiledb.Array.create(index_array_uri, index_schema)
310+
group.add(index_array_uri, name=INDEX_ARRAY_NAME)
308311

309312
if not tiledb.array_exists(ids_uri):
310313
logger.debug("Creating ids array")
@@ -582,14 +585,14 @@ def read_input_vectors(
582585
# --------------------------------------------------------------------
583586

584587
def copy_centroids(
585-
array_uri: str,
588+
index_group_uri: str,
586589
copy_centroids_uri: str,
587590
config: Optional[Mapping[str, Any]] = None,
588591
verbose: bool = False,
589592
trace_id: Optional[str] = None,
590593
):
591594
logger = setup(config, verbose)
592-
group = tiledb.Group(array_uri)
595+
group = tiledb.Group(index_group_uri)
593596
centroids_uri = group[CENTROIDS_ARRAY_NAME].uri
594597
logger.debug(
595598
"Copying centroids from: %s, to: %s", copy_centroids_uri, centroids_uri
@@ -604,7 +607,7 @@ def copy_centroids(
604607
# centralised kmeans UDFs
605608
# --------------------------------------------------------------------
606609
def centralised_kmeans(
607-
array_uri: str,
610+
index_group_uri: str,
608611
source_uri: str,
609612
source_type: str,
610613
vector_type: np.dtype,
@@ -623,7 +626,7 @@ def centralised_kmeans(
623626

624627
with tiledb.scope_ctx(ctx_or_config=config):
625628
logger = setup(config, verbose)
626-
group = tiledb.Group(array_uri)
629+
group = tiledb.Group(index_group_uri)
627630
centroids_uri = group[CENTROIDS_ARRAY_NAME].uri
628631
sample_vectors = read_input_vectors(
629632
source_uri=source_uri,
@@ -811,7 +814,7 @@ def compute_new_centroids(*argv):
811814
return np.mean(argv, axis=0).astype(np.float32)
812815

813816
def ingest_flat(
814-
array_uri: str,
817+
index_group_uri: str,
815818
source_uri: str,
816819
source_type: str,
817820
vector_type: np.dtype,
@@ -829,7 +832,7 @@ def ingest_flat(
829832

830833
logger = setup(config, verbose)
831834
with tiledb.scope_ctx(ctx_or_config=config):
832-
group = tiledb.Group(array_uri)
835+
group = tiledb.Group(index_group_uri)
833836
parts_array_uri = group[PARTS_ARRAY_NAME].uri
834837
target = tiledb.open(parts_array_uri, mode="w")
835838
logger.debug("Input vectors start_pos: %d, end_pos: %d", start, end)
@@ -857,7 +860,7 @@ def ingest_flat(
857860

858861
def write_centroids(
859862
centroids: np.array,
860-
array_uri: str,
863+
index_group_uri: str,
861864
partitions: int,
862865
dimensions: int,
863866
config: Optional[Mapping[str, Any]] = None,
@@ -866,7 +869,7 @@ def write_centroids(
866869
):
867870
with tiledb.scope_ctx(ctx_or_config=config):
868871
logger = setup(config, verbose)
869-
group = tiledb.Group(array_uri)
872+
group = tiledb.Group(index_group_uri)
870873
centroids_uri = group[CENTROIDS_ARRAY_NAME].uri
871874
logger.debug("Writing centroids to array %s", centroids_uri)
872875
with tiledb.open(centroids_uri, mode="w") as A:
@@ -876,7 +879,7 @@ def write_centroids(
876879
# vector ingestion UDFs
877880
# --------------------------------------------------------------------
878881
def ingest_vectors_udf(
879-
array_uri: str,
882+
index_group_uri: str,
880883
source_uri: str,
881884
source_type: str,
882885
vector_type: np.dtype,
@@ -898,7 +901,7 @@ def ingest_vectors_udf(
898901
)
899902

900903
logger = setup(config, verbose)
901-
group = tiledb.Group(array_uri)
904+
group = tiledb.Group(index_uri)
902905
centroids_uri = group[CENTROIDS_ARRAY_NAME].uri
903906
partial_write_array_dir_uri = group[PARTIAL_WRITE_ARRAY_DIR].uri
904907
partial_write_array_group = tiledb.Group(partial_write_array_dir_uri)
@@ -929,7 +932,7 @@ def ingest_vectors_udf(
929932
db_uri=source_uri,
930933
centroids_uri=centroids_uri,
931934
parts_uri=partial_write_array_parts_uri,
932-
index_uri=partial_write_array_index_uri,
935+
index_array_uri=partial_write_array_index_uri,
933936
id_uri=partial_write_array_ids_uri,
934937
start=part,
935938
end=part_end,
@@ -954,7 +957,7 @@ def ingest_vectors_udf(
954957
db=array_to_matrix(np.transpose(in_vectors).astype(vector_type)),
955958
centroids_uri=centroids_uri,
956959
parts_uri=partial_write_array_parts_uri,
957-
index_uri=partial_write_array_index_uri,
960+
index_array_uri=partial_write_array_index_uri,
958961
id_uri=partial_write_array_ids_uri,
959962
start=part,
960963
end=part_end,
@@ -963,15 +966,15 @@ def ingest_vectors_udf(
963966
)
964967

965968
def compute_partition_indexes_udf(
966-
array_uri: str,
969+
index_group_uri: str,
967970
partitions: int,
968971
config: Optional[Mapping[str, Any]] = None,
969972
verbose: bool = False,
970973
trace_id: Optional[str] = None,
971974
):
972975
logger = setup(config, verbose)
973976
with tiledb.scope_ctx(ctx_or_config=config):
974-
group = tiledb.Group(array_uri)
977+
group = tiledb.Group(index_group_uri)
975978
index_array_uri = group[INDEX_ARRAY_NAME].uri
976979
partial_write_array_dir_uri = group[PARTIAL_WRITE_ARRAY_DIR].uri
977980
partial_write_array_group = tiledb.Group(partial_write_array_dir_uri)
@@ -1007,7 +1010,7 @@ def compute_partition_indexes_udf(
10071010
index_array[:] = indexes
10081011

10091012
def consolidate_partition_udf(
1010-
array_uri: str,
1013+
index_group_uri: str,
10111014
partition_id_start: int,
10121015
partition_id_end: int,
10131016
batch: int,
@@ -1021,7 +1024,7 @@ def consolidate_partition_udf(
10211024
logger.debug(
10221025
"Consolidating partitions %d-%d", partition_id_start, partition_id_end
10231026
)
1024-
group = tiledb.Group(array_uri)
1027+
group = tiledb.Group(index_group_uri)
10251028
partial_write_array_dir_uri = group[PARTIAL_WRITE_ARRAY_DIR].uri
10261029
partial_write_array_group = tiledb.Group(partial_write_array_dir_uri)
10271030
partial_write_array_ids_uri = partial_write_array_group[IDS_ARRAY_NAME].uri
@@ -1119,7 +1122,7 @@ def submit_local(d, func, *args, **kwargs):
11191122

11201123
def create_ingestion_dag(
11211124
index_type: str,
1122-
array_uri: str,
1125+
index_group_uri: str,
11231126
source_uri: str,
11241127
source_type: str,
11251128
vector_type: np.dtype,
@@ -1174,7 +1177,7 @@ def create_ingestion_dag(
11741177
end = size
11751178
ingest_node = submit(
11761179
ingest_flat,
1177-
array_uri=array_uri,
1180+
index_group_uri=index_group_uri,
11781181
source_uri=source_uri,
11791182
source_type=source_type,
11801183
vector_type=vector_type,
@@ -1195,7 +1198,7 @@ def create_ingestion_dag(
11951198
if copy_centroids_uri is not None:
11961199
centroids_node = submit(
11971200
copy_centroids,
1198-
array_uri=array_uri,
1201+
index_group_uri=index_group_uri,
11991202
copy_centroids_uri=copy_centroids_uri,
12001203
config=config,
12011204
verbose=verbose,
@@ -1208,7 +1211,7 @@ def create_ingestion_dag(
12081211
if training_sample_size <= CENTRALISED_KMEANS_MAX_SAMPLE_SIZE:
12091212
centroids_node = submit(
12101213
centralised_kmeans,
1211-
array_uri=array_uri,
1214+
index_group_uri=index_group_uri,
12121215
source_uri=source_uri,
12131216
source_type=source_type,
12141217
vector_type=vector_type,
@@ -1291,7 +1294,7 @@ def create_ingestion_dag(
12911294
centroids_node = submit(
12921295
write_centroids,
12931296
centroids=internal_centroids_node,
1294-
array_uri=array_uri,
1297+
index_group_uri=index_group_uri,
12951298
partitions=partitions,
12961299
dimensions=dimensions,
12971300
config=config,
@@ -1304,7 +1307,7 @@ def create_ingestion_dag(
13041307

13051308
compute_indexes_node = submit(
13061309
compute_partition_indexes_udf,
1307-
array_uri=array_uri,
1310+
index_group_uri=index_group_uri,
13081311
partitions=partitions,
13091312
config=config,
13101313
verbose=verbose,
@@ -1322,7 +1325,7 @@ def create_ingestion_dag(
13221325
end = size
13231326
ingest_node = submit(
13241327
ingest_vectors_udf,
1325-
array_uri=array_uri,
1328+
index_group_uri=index_group_uri,
13261329
source_uri=source_uri,
13271330
source_type=source_type,
13281331
vector_type=vector_type,
@@ -1354,7 +1357,7 @@ def create_ingestion_dag(
13541357
end = partitions
13551358
consolidate_partition_node = submit(
13561359
consolidate_partition_udf,
1357-
array_uri=array_uri,
1360+
index_group_uri=index_group_uri,
13581361
partition_id_start=start,
13591362
partition_id_end=end,
13601363
batch=table_partitions_per_work_item,
@@ -1373,39 +1376,45 @@ def create_ingestion_dag(
13731376
raise ValueError(f"Not supported index_type {index_type}")
13741377

13751378
def consolidate_and_vacuum(
1376-
array_uri: str,
1379+
index_group_uri: str,
13771380
config: Optional[Mapping[str, Any]] = None,
13781381
):
13791382
modes = ["fragment_meta", "commits", "array_meta"]
13801383
for mode in modes:
13811384
conf = tiledb.Config(config)
13821385
conf["sm.consolidation.mode"] = mode
13831386
conf["sm.vacuum.mode"] = mode
1384-
group = tiledb.Group(array_uri, config=conf)
1387+
group = tiledb.Group(index_group_uri, config=conf)
13851388
tiledb.consolidate(group[PARTS_ARRAY_NAME].uri, config=conf)
13861389
tiledb.vacuum(group[PARTS_ARRAY_NAME].uri, config=conf)
13871390
if index_type == "IVF_FLAT":
13881391
tiledb.consolidate(group[IDS_ARRAY_NAME].uri, config=conf)
13891392
tiledb.vacuum(group[IDS_ARRAY_NAME].uri, config=conf)
13901393

13911394
# TODO remove temp data for tiledb URIs
1392-
if not array_uri.startswith("tiledb://"):
1395+
if not index_group_uri.startswith("tiledb://"):
13931396
vfs = tiledb.VFS(config)
1394-
partial_write_array_dir_uri = array_uri + "/" + PARTIAL_WRITE_ARRAY_DIR
1397+
partial_write_array_dir_uri = index_group_uri + "/" + PARTIAL_WRITE_ARRAY_DIR
13951398
if vfs.is_dir(partial_write_array_dir_uri):
13961399
vfs.remove_dir(partial_write_array_dir_uri)
13971400

1401+
1402+
# --------------------------------------------------------------------
1403+
# End internal function definitions
1404+
# --------------------------------------------------------------------
1405+
1406+
13981407
with tiledb.scope_ctx(ctx_or_config=config):
13991408
logger = setup(config, verbose)
1400-
logger.debug("Ingesting Vectors into %r", array_uri)
1409+
logger.debug("Ingesting Vectors into %r", index_uri)
14011410
try:
1402-
tiledb.group_create(array_uri)
1411+
tiledb.group_create(index_group_uri)
14031412
except tiledb.TileDBError as err:
14041413
message = str(err)
14051414
if "already exists" in message:
1406-
logger.debug(f"Group '{array_uri}' already exists")
1415+
logger.debug(f"Group '{index_group_uri}' already exists")
14071416
raise err
1408-
group = tiledb.Group(array_uri, "w")
1417+
group = tiledb.Group(index_group_uri, "w")
14091418

14101419
in_size, dimensions, vector_type = read_source_metadata(
14111420
source_uri=source_uri, source_type=source_type, logger=logger
@@ -1492,7 +1501,7 @@ def consolidate_and_vacuum(
14921501
logger.debug("Creating ingestion graph")
14931502
d = create_ingestion_dag(
14941503
index_type=index_type,
1495-
array_uri=array_uri,
1504+
index_group_uri=index_group_uri,
14961505
source_uri=source_uri,
14971506
source_type=source_type,
14981507
vector_type=vector_type,
@@ -1515,9 +1524,9 @@ def consolidate_and_vacuum(
15151524
d.compute()
15161525
logger.debug("Submitted ingestion graph")
15171526
d.wait()
1518-
consolidate_and_vacuum(array_uri=array_uri, config=config)
1527+
consolidate_and_vacuum(index_group_uri=index_group_uri, config=config)
15191528

15201529
if index_type == "FLAT":
1521-
return FlatIndex(uri=array_uri, config=config)
1530+
return FlatIndex(uri=index_group_uri, config=config)
15221531
elif index_type == "IVF_FLAT":
1523-
return IVFFlatIndex(uri=array_uri, memory_budget=1000000, config=config)
1532+
return IVFFlatIndex(uri=index_group_uri, memory_budget=1000000, config=config)

0 commit comments

Comments
 (0)