Skip to content

Commit fd7b205

Browse files
author
Nikos Papailiou
committed
Clean up ingestion info output and use debug
1 parent dc5bcba commit fd7b205

File tree

1 file changed

+46
-45
lines changed

1 file changed

+46
-45
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def create_arrays(
197197
if index_type == "FLAT":
198198
parts_uri = f"{group.uri}/{PARTS_ARRAY_NAME}"
199199
if not tiledb.array_exists(parts_uri):
200-
logger.info("Creating parts array")
200+
logger.debug("Creating parts array")
201201
parts_array_rows_dim = tiledb.Dim(
202202
name="rows",
203203
domain=(0, dimensions - 1),
@@ -222,7 +222,7 @@ def create_arrays(
222222
cell_order="col-major",
223223
tile_order="col-major",
224224
)
225-
logger.info(parts_schema)
225+
logger.debug(parts_schema)
226226
tiledb.Array.create(parts_uri, parts_schema)
227227
group.add(PARTS_ARRAY_NAME, name=PARTS_ARRAY_NAME, relative=True)
228228

@@ -243,7 +243,7 @@ def create_arrays(
243243
)
244244

245245
if not tiledb.array_exists(centroids_uri):
246-
logger.info("Creating centroids array")
246+
logger.debug("Creating centroids array")
247247
centroids_array_rows_dim = tiledb.Dim(
248248
name="rows",
249249
domain=(0, dimensions - 1),
@@ -270,14 +270,14 @@ def create_arrays(
270270
cell_order="col-major",
271271
tile_order="col-major",
272272
)
273-
logger.info(centroids_schema)
273+
logger.debug(centroids_schema)
274274
tiledb.Array.create(centroids_uri, centroids_schema)
275275
group.add(
276276
CENTROIDS_ARRAY_NAME, name=CENTROIDS_ARRAY_NAME, relative=True
277277
)
278278

279279
if not tiledb.array_exists(index_uri):
280-
logger.info("Creating index array")
280+
logger.debug("Creating index array")
281281
index_array_rows_dim = tiledb.Dim(
282282
name="rows",
283283
domain=(0, partitions),
@@ -294,12 +294,12 @@ def create_arrays(
294294
cell_order="col-major",
295295
tile_order="col-major",
296296
)
297-
logger.info(index_schema)
297+
logger.debug(index_schema)
298298
tiledb.Array.create(index_uri, index_schema)
299299
group.add(INDEX_ARRAY_NAME, name=INDEX_ARRAY_NAME, relative=True)
300300

301301
if not tiledb.array_exists(ids_uri):
302-
logger.info("Creating ids array")
302+
logger.debug("Creating ids array")
303303
ids_array_rows_dim = tiledb.Dim(
304304
name="rows",
305305
domain=(0, size - 1),
@@ -316,12 +316,12 @@ def create_arrays(
316316
cell_order="col-major",
317317
tile_order="col-major",
318318
)
319-
logger.info(ids_schema)
319+
logger.debug(ids_schema)
320320
tiledb.Array.create(ids_uri, ids_schema)
321321
group.add(IDS_ARRAY_NAME, name=IDS_ARRAY_NAME, relative=True)
322322

323323
if not tiledb.array_exists(parts_uri):
324-
logger.info("Creating parts array")
324+
logger.debug("Creating parts array")
325325
parts_array_rows_dim = tiledb.Dim(
326326
name="rows",
327327
domain=(0, dimensions - 1),
@@ -346,7 +346,7 @@ def create_arrays(
346346
cell_order="col-major",
347347
tile_order="col-major",
348348
)
349-
logger.info(parts_schema)
349+
logger.debug(parts_schema)
350350
tiledb.Array.create(parts_uri, parts_schema)
351351
group.add(PARTS_ARRAY_NAME, name=PARTS_ARRAY_NAME, relative=True)
352352

@@ -359,7 +359,7 @@ def create_arrays(
359359
vfs.create_dir(partial_write_array_index_uri)
360360

361361
if not tiledb.array_exists(partial_write_array_ids_uri):
362-
logger.info("Creating temp ids array")
362+
logger.debug("Creating temp ids array")
363363
ids_array_rows_dim = tiledb.Dim(
364364
name="rows",
365365
domain=(0, size - 1),
@@ -376,11 +376,11 @@ def create_arrays(
376376
cell_order="col-major",
377377
tile_order="col-major",
378378
)
379-
logger.info(ids_schema)
379+
logger.debug(ids_schema)
380380
tiledb.Array.create(partial_write_array_ids_uri, ids_schema)
381381

382382
if not tiledb.array_exists(partial_write_array_parts_uri):
383-
logger.info("Creating temp parts array")
383+
logger.debug("Creating temp parts array")
384384
parts_array_rows_dim = tiledb.Dim(
385385
name="rows",
386386
domain=(0, dimensions - 1),
@@ -405,8 +405,8 @@ def create_arrays(
405405
cell_order="col-major",
406406
tile_order="col-major",
407407
)
408-
logger.info(parts_schema)
409-
logger.info(partial_write_array_parts_uri)
408+
logger.debug(parts_schema)
409+
logger.debug(partial_write_array_parts_uri)
410410
tiledb.Array.create(partial_write_array_parts_uri, parts_schema)
411411
else:
412412
raise ValueError(f"Not supported index_type {index_type}")
@@ -519,7 +519,7 @@ def copy_centroids(
519519
dest = tiledb.open(centroids_uri, mode="w")
520520
src_centroids = src[:, :]
521521
dest[:, :] = src_centroids
522-
logger.info(src_centroids)
522+
logger.debug(src_centroids)
523523

524524
# --------------------------------------------------------------------
525525
# centralised kmeans UDFs
@@ -632,7 +632,7 @@ def generate_new_centroid_per_thread(
632632
new_centroid_count = np.ones(len(cents_t))
633633
for vector_id in range(start, end):
634634
if vector_id % 100000 == 0:
635-
logger.info(f"Vectors computed: {vector_id}")
635+
logger.debug(f"Vectors computed: {vector_id}")
636636
c_id = assignments_t[vector_id]
637637
if new_centroid_count[c_id] == 1:
638638
new_centroid_sums[c_id] = vectors_t[vector_id]
@@ -642,7 +642,7 @@ def generate_new_centroid_per_thread(
642642
new_centroid_count[c_id] += 1
643643
new_centroid_sums_queue.put(new_centroid_sums)
644644
new_centroid_counts_queue.put(new_centroid_count)
645-
logger.info(f"Finished thread: {thread_id}")
645+
logger.debug(f"Finished thread: {thread_id}")
646646

647647
def update_centroids():
648648
import multiprocessing as mp
@@ -715,15 +715,15 @@ def update_centroids():
715715
verbose=verbose,
716716
trace_id=trace_id,
717717
)
718-
logger.info(f"Input centroids: {centroids[0:5]}")
718+
logger.debug(f"Input centroids: {centroids[0:5]}")
719719
logger.info("Assigning vectors to centroids")
720720
km = KMeans()
721721
km._n_threads = threads
722722
km.cluster_centers_ = centroids
723723
assignments = km.predict(vectors)
724-
logger.info(f"Assignments: {assignments[0:100]}")
724+
logger.debug(f"Assignments: {assignments[0:100]}")
725725
partial_new_centroids = update_centroids()
726-
logger.info(f"New centroids: {partial_new_centroids[0:5]}")
726+
logger.debug(f"New centroids: {partial_new_centroids[0:5]}")
727727
return partial_new_centroids
728728

729729
def compute_new_centroids(*argv):
@@ -771,7 +771,7 @@ def ingest_flat(
771771
trace_id=trace_id,
772772
)
773773

774-
logger.info(f"Vector read:{len(in_vectors)}")
774+
logger.debug(f"Vector read:{len(in_vectors)}")
775775
logger.info(f"Writing data to array {parts_array_uri}")
776776
target[0:dimensions, start:end] = np.transpose(in_vectors)
777777
target.close()
@@ -838,7 +838,7 @@ def ingest_vectors_udf(
838838
)
839839
logger.info(f"Input vectors start_pos: {part}, end_pos: {part_end}")
840840
if source_type == "TILEDB_ARRAY":
841-
logger.info("Start indexing")
841+
logger.debug("Start indexing")
842842
ivf_index_tdb(
843843
dtype=vector_type,
844844
db_uri=source_uri,
@@ -863,7 +863,7 @@ def ingest_vectors_udf(
863863
verbose=verbose,
864864
trace_id=trace_id,
865865
)
866-
logger.info("Start indexing")
866+
logger.debug("Start indexing")
867867
ivf_index(
868868
dtype=vector_type,
869869
db=array_to_matrix(np.transpose(in_vectors).astype(vector_type)),
@@ -911,7 +911,7 @@ def compute_partition_indexes_udf(
911911
sum += partition_size
912912
i += 1
913913
indexes[i] = sum
914-
logger.info(f"Partition indexes: {indexes}")
914+
logger.debug(f"Partition indexes: {indexes}")
915915
index_array = tiledb.open(index_array_uri, mode="w")
916916
index_array[:] = indexes
917917

@@ -969,7 +969,7 @@ def consolidate_partition_udf(
969969
index_array = tiledb.open(index_array_uri, mode="r")
970970
ids_array = tiledb.open(ids_array_uri, mode="w")
971971
parts_array = tiledb.open(parts_array_uri, mode="w")
972-
logger.info(
972+
logger.debug(
973973
f"Partitions start: {partition_id_start} end: {partition_id_end}"
974974
)
975975
for part in range(partition_id_start, partition_id_end, batch):
@@ -1308,8 +1308,7 @@ def consolidate_and_vacuum(
13081308
message = str(err)
13091309
if "already exists" in message:
13101310
logger.info(f"Group '{array_uri}' already exists")
1311-
else:
1312-
raise err
1311+
raise err
13131312
group = tiledb.Group(array_uri, "w")
13141313
group.meta["dataset_type"] = "vector_search"
13151314

@@ -1320,9 +1319,9 @@ def consolidate_and_vacuum(
13201319
size = in_size
13211320
if size > in_size:
13221321
size = in_size
1323-
logger.info("Input dataset size %d", size)
1324-
logger.info("Input dataset dimensions %d", dimensions)
1325-
logger.info(f"Vector dimension type {vector_type}")
1322+
logger.debug("Input dataset size %d", size)
1323+
logger.debug("Input dataset dimensions %d", dimensions)
1324+
logger.debug(f"Vector dimension type {vector_type}")
13261325
if partitions == -1:
13271326
partitions = int(math.sqrt(size))
13281327
if training_sample_size == -1:
@@ -1332,9 +1331,9 @@ def consolidate_and_vacuum(
13321331
workers = 10
13331332
else:
13341333
workers = 1
1335-
logger.info(f"Partitions {partitions}")
1336-
logger.info(f"Training sample size {training_sample_size}")
1337-
logger.info(f"Number of workers {workers}")
1334+
logger.debug(f"Partitions {partitions}")
1335+
logger.debug(f"Training sample size {training_sample_size}")
1336+
logger.debug(f"Number of workers {workers}")
13381337

13391338
if input_vectors_per_work_item == -1:
13401339
input_vectors_per_work_item = VECTORS_PER_WORK_ITEM
@@ -1346,10 +1345,10 @@ def consolidate_and_vacuum(
13461345
math.ceil(input_vectors_work_items / MAX_TASKS_PER_STAGE)
13471346
)
13481347
input_vectors_work_tasks = MAX_TASKS_PER_STAGE
1349-
logger.info("input_vectors_per_work_item %d", input_vectors_per_work_item)
1350-
logger.info("input_vectors_work_items %d", input_vectors_work_items)
1351-
logger.info("input_vectors_work_tasks %d", input_vectors_work_tasks)
1352-
logger.info(
1348+
logger.debug("input_vectors_per_work_item %d", input_vectors_per_work_item)
1349+
logger.debug("input_vectors_work_items %d", input_vectors_work_items)
1350+
logger.debug("input_vectors_work_tasks %d", input_vectors_work_tasks)
1351+
logger.debug(
13531352
"input_vectors_work_items_per_worker %d",
13541353
input_vectors_work_items_per_worker,
13551354
)
@@ -1368,15 +1367,17 @@ def consolidate_and_vacuum(
13681367
math.ceil(table_partitions_work_items / MAX_TASKS_PER_STAGE)
13691368
)
13701369
table_partitions_work_tasks = MAX_TASKS_PER_STAGE
1371-
logger.info("table_partitions_per_work_item %d", table_partitions_per_work_item)
1372-
logger.info("table_partitions_work_items %d", table_partitions_work_items)
1373-
logger.info("table_partitions_work_tasks %d", table_partitions_work_tasks)
1374-
logger.info(
1370+
logger.debug(
1371+
"table_partitions_per_work_item %d", table_partitions_per_work_item
1372+
)
1373+
logger.debug("table_partitions_work_items %d", table_partitions_work_items)
1374+
logger.debug("table_partitions_work_tasks %d", table_partitions_work_tasks)
1375+
logger.debug(
13751376
"table_partitions_work_items_per_worker %d",
13761377
table_partitions_work_items_per_worker,
13771378
)
13781379

1379-
logger.info("Creating arrays")
1380+
logger.debug("Creating arrays")
13801381
create_arrays(
13811382
group=group,
13821383
index_type=index_type,
@@ -1389,7 +1390,7 @@ def consolidate_and_vacuum(
13891390
)
13901391
group.close()
13911392

1392-
logger.info("Creating ingestion graph")
1393+
logger.debug("Creating ingestion graph")
13931394
d = create_ingestion_dag(
13941395
index_type=index_type,
13951396
array_uri=array_uri,
@@ -1411,7 +1412,7 @@ def consolidate_and_vacuum(
14111412
trace_id=trace_id,
14121413
mode=mode,
14131414
)
1414-
logger.info("Submitting ingestion graph")
1415+
logger.debug("Submitting ingestion graph")
14151416
d.compute()
14161417
logger.info("Submitted ingestion graph")
14171418
d.wait()

0 commit comments

Comments
 (0)