Skip to content

Commit bd43d28

Browse files
Clean-up ingestion.py and reduce non-verbose log messages. (#82)
* Fix double imports. * Log the schemas of the created arrays in debug mode. * Use percent formatting for the log messages. * Do not redefine the built-in function `sum`. * Downgrade many log messages to debug severity.
1 parent a4436be commit bd43d28

File tree

1 file changed

+42
-60
lines changed

1 file changed

+42
-60
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 42 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,14 @@ def ingest(
6969
import enum
7070
import logging
7171
import math
72-
from typing import Any, Mapping, Optional
72+
from typing import Any, Mapping
7373
import multiprocessing
7474
import os
7575

7676
import numpy as np
7777

7878
import tiledb
7979
from tiledb.cloud import dag
80-
from tiledb.cloud.dag import Mode
8180
from tiledb.cloud.rest_api import models
8281
from tiledb.cloud.utilities import get_logger
8382
from tiledb.cloud.utilities import set_aws_context
@@ -423,7 +422,7 @@ def read_input_vectors(
423422
trace_id: Optional[str] = None,
424423
) -> np.array:
425424
logger = setup(config, verbose)
426-
logger.info(f"Reading input vectors start_pos: {start_pos}, end_pos: {end_pos}")
425+
logger.debug("Reading input vectors start_pos: %i, end_pos: %i", start_pos, end_pos)
427426
if source_type == "TILEDB_ARRAY":
428427
with tiledb.open(source_uri, mode="r") as src_array:
429428
return np.transpose(
@@ -512,9 +511,7 @@ def copy_centroids(
512511
logger = setup(config, verbose)
513512
group = tiledb.Group(array_uri)
514513
centroids_uri = group[CENTROIDS_ARRAY_NAME].uri
515-
logger.info(
516-
f"Copying centroids from: {copy_centroids_uri}, to: {centroids_uri}"
517-
)
514+
logger.debug("Copying centroids from: %s, to: %s", copy_centroids_uri, centroids_uri)
518515
src = tiledb.open(copy_centroids_uri, mode="r")
519516
dest = tiledb.open(centroids_uri, mode="w")
520517
src_centroids = src[:, :]
@@ -560,7 +557,7 @@ def centralised_kmeans(
560557
verb = 0
561558
if verbose:
562559
verb = 3
563-
logger.info("Start kmeans training")
560+
logger.debug("Start kmeans training")
564561
km = KMeans(
565562
n_clusters=partitions,
566563
init=init,
@@ -569,7 +566,7 @@ def centralised_kmeans(
569566
n_init=n_init,
570567
)
571568
km.fit_predict(sample_vectors)
572-
logger.info(f"Writing centroids to array {centroids_uri}")
569+
logger.debug("Writing centroids to array %s", centroids_uri)
573570
with tiledb.open(centroids_uri, mode="w") as A:
574571
A[0:dimensions, 0:partitions] = np.transpose(
575572
np.array(km.cluster_centers_)
@@ -589,9 +586,7 @@ def init_centroids(
589586
trace_id: Optional[str] = None,
590587
) -> np.array:
591588
logger = setup(config, verbose)
592-
logger.info(
593-
"Initialising centroids by reading the first vectors in the source data."
594-
)
589+
logger.debug("Initialising centroids by reading the first vectors in the source data.")
595590
with tiledb.scope_ctx(ctx_or_config=config):
596591
return read_input_vectors(
597592
source_uri=source_uri,
@@ -632,7 +627,7 @@ def generate_new_centroid_per_thread(
632627
new_centroid_count = np.ones(len(cents_t))
633628
for vector_id in range(start, end):
634629
if vector_id % 100000 == 0:
635-
logger.debug(f"Vectors computed: {vector_id}")
630+
logger.debug("Vectors computed: %d", vector_id)
636631
c_id = assignments_t[vector_id]
637632
if new_centroid_count[c_id] == 1:
638633
new_centroid_sums[c_id] = vectors_t[vector_id]
@@ -642,13 +637,13 @@ def generate_new_centroid_per_thread(
642637
new_centroid_count[c_id] += 1
643638
new_centroid_sums_queue.put(new_centroid_sums)
644639
new_centroid_counts_queue.put(new_centroid_count)
645-
logger.debug(f"Finished thread: {thread_id}")
640+
logger.debug("Finished thread: %d", thread_id)
646641

647642
def update_centroids():
648643
import multiprocessing as mp
649644

650-
logger.info("Updating centroids based on assignments.")
651-
logger.info(f"Using {threads} threads.")
645+
logger.debug("Updating centroids based on assignments.")
646+
logger.debug("Using %d threads.", threads)
652647
global cents_t, vectors_t, assignments_t, new_centroid_thread_sums, new_centroid_thread_counts
653648
cents_t = centroids
654649
vectors_t = vectors
@@ -687,7 +682,7 @@ def update_centroids():
687682
)
688683
workers[i].join()
689684

690-
logger.info("Finished all threads, aggregating partial results.")
685+
logger.debug("Finished all threads, aggregating partial results.")
691686
new_centroids = []
692687
for c_id in range(partitions):
693688
cent = []
@@ -703,7 +698,7 @@ def update_centroids():
703698

704699
logger = setup(config, verbose)
705700
with tiledb.scope_ctx(ctx_or_config=config):
706-
logger.info("Reading input vectors.")
701+
logger.debug("Reading input vectors.")
707702
vectors = read_input_vectors(
708703
source_uri=source_uri,
709704
source_type=source_type,
@@ -715,15 +710,15 @@ def update_centroids():
715710
verbose=verbose,
716711
trace_id=trace_id,
717712
)
718-
logger.debug(f"Input centroids: {centroids[0:5]}")
719-
logger.info("Assigning vectors to centroids")
713+
logger.debug("Input centroids: %s", centroids[0:5])
714+
logger.debug("Assigning vectors to centroids")
720715
km = KMeans()
721716
km._n_threads = threads
722717
km.cluster_centers_ = centroids
723718
assignments = km.predict(vectors)
724-
logger.debug(f"Assignments: {assignments[0:100]}")
719+
logger.debug("Assignments: %s", assignments[0:100])
725720
partial_new_centroids = update_centroids()
726-
logger.debug(f"New centroids: {partial_new_centroids[0:5]}")
721+
logger.debug("New centroids: %s", partial_new_centroids[0:5])
727722
return partial_new_centroids
728723

729724
def compute_new_centroids(*argv):
@@ -753,7 +748,7 @@ def ingest_flat(
753748
group = tiledb.Group(array_uri)
754749
parts_array_uri = group[PARTS_ARRAY_NAME].uri
755750
target = tiledb.open(parts_array_uri, mode="w")
756-
logger.info(f"Input vectors start_pos: {start}, end_pos: {end}")
751+
logger.debug("Input vectors start_pos: %d, end_pos: %d", start, end)
757752

758753
for part in range(start, end, batch):
759754
part_end = part + batch
@@ -771,8 +766,8 @@ def ingest_flat(
771766
trace_id=trace_id,
772767
)
773768

774-
logger.debug(f"Vector read:{len(in_vectors)}")
775-
logger.info(f"Writing data to array {parts_array_uri}")
769+
logger.debug("Vector read: %d", len(in_vectors))
770+
logger.debug("Writing data to array %s", parts_array_uri)
776771
target[0:dimensions, start:end] = np.transpose(in_vectors)
777772
target.close()
778773

@@ -789,7 +784,7 @@ def write_centroids(
789784
logger = setup(config, verbose)
790785
group = tiledb.Group(array_uri)
791786
centroids_uri = group[CENTROIDS_ARRAY_NAME].uri
792-
logger.info(f"Writing centroids to array {centroids_uri}")
787+
logger.debug("Writing centroids to array %s", centroids_uri)
793788
with tiledb.open(centroids_uri, mode="w") as A:
794789
A[0:dimensions, 0:partitions] = np.transpose(np.array(centroids))
795790

@@ -836,7 +831,7 @@ def ingest_vectors_udf(
836831
partial_write_array_index_uri = (
837832
partial_write_array_dir_uri + "/" + INDEX_ARRAY_NAME + "/" + part_name
838833
)
839-
logger.info(f"Input vectors start_pos: {part}, end_pos: {part_end}")
834+
logger.debug("Input vectors start_pos: %d, end_pos: %d", part, part_end)
840835
if source_type == "TILEDB_ARRAY":
841836
logger.debug("Start indexing")
842837
ivf_index_tdb(
@@ -903,15 +898,15 @@ def compute_partition_indexes_udf(
903898
partition_sizes[i] += int(partial_index) - int(prev_index)
904899
prev_index = partial_index
905900
i += 1
906-
logger.debug(f"Partition sizes: {partition_sizes}")
901+
logger.debug("Partition sizes: %s", partition_sizes)
907902
i = 0
908-
sum = 0
903+
_sum = 0
909904
for partition_size in partition_sizes:
910-
indexes[i] = sum
911-
sum += partition_size
905+
indexes[i] = _sum
906+
_sum += partition_size
912907
i += 1
913-
indexes[i] = sum
914-
logger.debug(f"Partition indexes: {indexes}")
908+
indexes[i] = _sum
909+
logger.debug("Partition indexes: %d", indexes)
915910
index_array = tiledb.open(index_array_uri, mode="w")
916911
index_array[:] = indexes
917912

@@ -927,9 +922,7 @@ def consolidate_partition_udf(
927922
):
928923
logger = setup(config, verbose)
929924
with tiledb.scope_ctx(ctx_or_config=config):
930-
logger.info(
931-
f"Consolidating partitions {partition_id_start}-{partition_id_end}"
932-
)
925+
logger.debug("Consolidating partitions %d-%d", partition_id_start, partition_id_end)
933926
group = tiledb.Group(array_uri)
934927
partial_write_array_dir_uri = array_uri + "/" + PARTIAL_WRITE_ARRAY_DIR
935928
partial_write_array_ids_uri = (
@@ -969,14 +962,12 @@ def consolidate_partition_udf(
969962
index_array = tiledb.open(index_array_uri, mode="r")
970963
ids_array = tiledb.open(ids_array_uri, mode="w")
971964
parts_array = tiledb.open(parts_array_uri, mode="w")
972-
logger.debug(
973-
f"Partitions start: {partition_id_start} end: {partition_id_end}"
974-
)
965+
logger.debug("Partitions start: %d end: %d", partition_id_start, partition_id_end)
975966
for part in range(partition_id_start, partition_id_end, batch):
976967
part_end = part + batch
977968
if part_end > partition_id_end:
978969
part_end = partition_id_end
979-
logger.info(f"Consolidating partitions start: {part} end: {part_end}")
970+
logger.debug("Consolidating partitions start: %d end: %d", part, part_end)
980971
read_slices = []
981972
for p in range(part, part_end):
982973
for partition_slice in partition_slices[p]:
@@ -988,21 +979,20 @@ def consolidate_partition_udf(
988979
if start_pos != end_pos:
989980
raise ValueError("Incorrect partition size.")
990981
continue
991-
logger.debug(f"Read slices: {read_slices}")
982+
logger.debug("Read slices: %s", read_slices)
992983
ids = partial_write_array_ids_array.multi_index[read_slices]["values"]
993984
vectors = partial_write_array_parts_array.multi_index[:, read_slices][
994985
"values"
995986
]
996987

997-
logger.debug(
998-
f"Ids shape {ids.shape}, expected size: {end_pos - start_pos} expected range:({start_pos},{end_pos})"
999-
)
988+
logger.debug("Ids shape %s, expected size: %d expected range:(%d,%d)", ids.shape, end_pos - start_pos,
989+
start_pos, end_pos)
1000990
if ids.shape[0] != end_pos - start_pos:
1001991
raise ValueError("Incorrect partition size.")
1002992

1003-
logger.info(f"Writing data to array: {parts_array_uri}")
993+
logger.info("Writing data to array: %s", parts_array_uri)
1004994
parts_array[:, start_pos:end_pos] = vectors
1005-
logger.info(f"Writing data to array: {ids_array_uri}")
995+
logger.info("Writing data to array: %s", ids_array_uri)
1006996
ids_array[start_pos:end_pos] = ids
1007997
parts_array.close()
1008998
ids_array.close()
@@ -1321,7 +1311,7 @@ def consolidate_and_vacuum(
13211311
size = in_size
13221312
logger.debug("Input dataset size %d", size)
13231313
logger.debug("Input dataset dimensions %d", dimensions)
1324-
logger.debug(f"Vector dimension type {vector_type}")
1314+
logger.debug("Vector dimension type %s", vector_type)
13251315
if partitions == -1:
13261316
partitions = int(math.sqrt(size))
13271317
if training_sample_size == -1:
@@ -1331,9 +1321,9 @@ def consolidate_and_vacuum(
13311321
workers = 10
13321322
else:
13331323
workers = 1
1334-
logger.debug(f"Partitions {partitions}")
1335-
logger.debug(f"Training sample size {training_sample_size}")
1336-
logger.debug(f"Number of workers {workers}")
1324+
logger.debug("Partitions %d", partitions)
1325+
logger.debug("Training sample size %d", training_sample_size)
1326+
logger.debug("Number of workers %d", workers)
13371327

13381328
if input_vectors_per_work_item == -1:
13391329
input_vectors_per_work_item = VECTORS_PER_WORK_ITEM
@@ -1348,10 +1338,7 @@ def consolidate_and_vacuum(
13481338
logger.debug("input_vectors_per_work_item %d", input_vectors_per_work_item)
13491339
logger.debug("input_vectors_work_items %d", input_vectors_work_items)
13501340
logger.debug("input_vectors_work_tasks %d", input_vectors_work_tasks)
1351-
logger.debug(
1352-
"input_vectors_work_items_per_worker %d",
1353-
input_vectors_work_items_per_worker,
1354-
)
1341+
logger.debug("input_vectors_work_items_per_worker %d", input_vectors_work_items_per_worker)
13551342

13561343
vectors_per_table_partitions = size / partitions
13571344
table_partitions_per_work_item = int(
@@ -1367,15 +1354,10 @@ def consolidate_and_vacuum(
13671354
math.ceil(table_partitions_work_items / MAX_TASKS_PER_STAGE)
13681355
)
13691356
table_partitions_work_tasks = MAX_TASKS_PER_STAGE
1370-
logger.debug(
1371-
"table_partitions_per_work_item %d", table_partitions_per_work_item
1372-
)
1357+
logger.debug("table_partitions_per_work_item %d", table_partitions_per_work_item)
13731358
logger.debug("table_partitions_work_items %d", table_partitions_work_items)
13741359
logger.debug("table_partitions_work_tasks %d", table_partitions_work_tasks)
1375-
logger.debug(
1376-
"table_partitions_work_items_per_worker %d",
1377-
table_partitions_work_items_per_worker,
1378-
)
1360+
logger.debug("table_partitions_work_items_per_worker %d", table_partitions_work_items_per_worker)
13791361

13801362
logger.debug("Creating arrays")
13811363
create_arrays(

0 commit comments

Comments
 (0)