Skip to content

Commit 209dafa

Browse files
Add support for TileDB sparse arrays in vector ingestion (#262)
This add support for TileDB sparse arrays as input in vector ingestion. It also: - Removes the attribute name `values` requirement for dense arrays. Instead we assume that the values are in the first attribute of the array schema. - Fixes a bug in random sampling where we used `in_size` instead of `size` to compute the sampling tasks. The sparse array input functionality can be used facilitate the ingestion of single cell census embeddings (https://cellxgene.cziscience.com/census-models)
1 parent ccdaff8 commit 209dafa

File tree

1 file changed

+33
-8
lines changed

1 file changed

+33
-8
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,12 @@ def read_source_metadata(
320320
schema = tiledb.ArraySchema.load(source_uri)
321321
size = schema.domain.dim(1).domain[1] + 1
322322
dimensions = schema.domain.dim(0).domain[1] + 1
323-
return size, dimensions, schema.attr("values").dtype
323+
return size, dimensions, schema.attr(0).dtype
324+
if source_type == "TILEDB_SPARSE_ARRAY":
325+
schema = tiledb.ArraySchema.load(source_uri)
326+
size = schema.domain.dim(0).domain[1] + 1
327+
dimensions = schema.domain.dim(1).domain[1] + 1
328+
return size, dimensions, schema.attr(0).dtype
324329
if source_type == "TILEDB_PARTITIONED_ARRAY":
325330
with tiledb.open(source_uri, "r", config=config) as source_array:
326331
q = source_array.query(attrs=("vectors_shape",), coords=True)
@@ -377,7 +382,7 @@ def read_source_metadata(
377382
return size, dimensions, np.uint8
378383
else:
379384
raise ValueError(
380-
f"Not supported source_type {source_type} - valid types are [TILEDB_ARRAY, U8BIN, F32BIN, FVEC, IVEC, BVEC]"
385+
f"Not supported source_type {source_type} - valid types are [TILEDB_ARRAY, TILEDB_SPARSE_ARRAY, U8BIN, F32BIN, FVEC, IVEC, BVEC]"
381386
)
382387

383388
def create_array(
@@ -838,9 +843,29 @@ def read_input_vectors(
838843
with tiledb.open(
839844
source_uri, mode="r", timestamp=index_timestamp
840845
) as src_array:
846+
src_array_schema = src_array.schema
841847
return np.transpose(
842-
src_array[0:dimensions, start_pos:end_pos]["values"]
848+
src_array[0:dimensions, start_pos:end_pos][
849+
src_array_schema.attr(0).name
850+
]
843851
).copy(order="C")
852+
if source_type == "TILEDB_SPARSE_ARRAY":
853+
from scipy.sparse import coo_matrix
854+
855+
with tiledb.open(
856+
source_uri, mode="r", timestamp=index_timestamp
857+
) as src_array:
858+
src_array_schema = src_array.schema
859+
data = src_array[start_pos:end_pos, 0:dimensions]
860+
return coo_matrix(
861+
(
862+
data[src_array_schema.attr(0).name],
863+
(
864+
data[src_array_schema.domain.dim(0).name] - start_pos,
865+
data[src_array_schema.domain.dim(1).name],
866+
),
867+
)
868+
).toarray()
844869
elif source_type == "TILEDB_PARTITIONED_ARRAY":
845870
with tiledb.open(
846871
source_uri, "r", timestamp=index_timestamp, config=config
@@ -1958,15 +1983,15 @@ def create_ingestion_dag(
19581983
idx = 0
19591984
num_sampled = 0
19601985
for start in range(
1961-
0, in_size, input_vectors_batch_size_during_sampling
1986+
0, size, input_vectors_batch_size_during_sampling
19621987
):
19631988
# What vectors to read from the source_uri.
19641989
end = start + input_vectors_batch_size_during_sampling
19651990
if end > size:
19661991
end = size
19671992

19681993
# How many vectors to sample from the vectors read.
1969-
percent_of_data_to_read = (end - start) / in_size
1994+
percent_of_data_to_read = (end - start) / size
19701995
num_to_sample = math.ceil(
19711996
training_sample_size * percent_of_data_to_read
19721997
)
@@ -2331,9 +2356,9 @@ def consolidate_and_vacuum(
23312356
logger.debug("Input dataset size %d", size)
23322357
logger.debug("Input dataset dimensions %d", dimensions)
23332358
logger.debug("Vector dimension type %s", vector_type)
2334-
if training_sample_size > in_size:
2359+
if training_sample_size > size:
23352360
raise ValueError(
2336-
f"training_sample_size {training_sample_size} is larger than the input dataset size {in_size}"
2361+
f"training_sample_size {training_sample_size} is larger than the input dataset size {size}"
23372362
)
23382363

23392364
if partitions == -1:
@@ -2358,7 +2383,7 @@ def consolidate_and_vacuum(
23582383
external_ids_uri = write_external_ids(
23592384
group=group,
23602385
external_ids=external_ids,
2361-
size=in_size,
2386+
size=size,
23622387
partitions=partitions,
23632388
)
23642389
external_ids_type = "TILEDB_ARRAY"

0 commit comments

Comments
 (0)