Skip to content

Commit 9d3cb2f

Browse files
Remove temp data in consolidate_and_vacuum (#321)
Remove temp data in consolidate_and_vacuum
1 parent 8ce2d7f commit 9d3cb2f

File tree

1 file changed

+32
-37
lines changed

1 file changed

+32
-37
lines changed

apis/python/src/tiledb/vector_search/ingestion.py

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2510,43 +2510,38 @@ def consolidate_and_vacuum(
25102510
index_group_uri: str,
25112511
config: Optional[Mapping[str, Any]] = None,
25122512
):
2513-
group = tiledb.Group(index_group_uri)
2514-
try:
2515-
if INPUT_VECTORS_ARRAY_NAME in group:
2516-
tiledb.Array.delete_array(group[INPUT_VECTORS_ARRAY_NAME].uri)
2517-
if EXTERNAL_IDS_ARRAY_NAME in group:
2518-
tiledb.Array.delete_array(group[EXTERNAL_IDS_ARRAY_NAME].uri)
2519-
except tiledb.TileDBError as err:
2520-
message = str(err)
2521-
if "does not exist" not in message:
2522-
raise err
2523-
modes = ["fragment_meta", "commits", "array_meta"]
2524-
for mode in modes:
2525-
conf = tiledb.Config(config)
2526-
conf["sm.consolidation.mode"] = mode
2527-
conf["sm.vacuum.mode"] = mode
2528-
ids_uri = group[IDS_ARRAY_NAME].uri
2529-
parts_uri = group[PARTS_ARRAY_NAME].uri
2530-
tiledb.consolidate(parts_uri, config=conf)
2531-
tiledb.vacuum(parts_uri, config=conf)
2532-
tiledb.consolidate(ids_uri, config=conf)
2533-
tiledb.vacuum(ids_uri, config=conf)
2534-
group.close()
2535-
2536-
# TODO remove temp data for tiledb URIs
2537-
if not index_group_uri.startswith("tiledb://"):
2538-
group = tiledb.Group(index_group_uri, "r")
2539-
if PARTIAL_WRITE_ARRAY_DIR in group:
2540-
group.close()
2541-
group = tiledb.Group(index_group_uri, "w")
2542-
group.remove(PARTIAL_WRITE_ARRAY_DIR)
2543-
vfs = tiledb.VFS(config)
2544-
partial_write_array_dir_uri = (
2545-
index_group_uri + "/" + PARTIAL_WRITE_ARRAY_DIR
2546-
)
2547-
if vfs.is_dir(partial_write_array_dir_uri):
2548-
vfs.remove_dir(partial_write_array_dir_uri)
2549-
group.close()
2513+
with tiledb.Group(index_group_uri) as group:
2514+
try:
2515+
if INPUT_VECTORS_ARRAY_NAME in group:
2516+
tiledb.Array.delete_array(group[INPUT_VECTORS_ARRAY_NAME].uri)
2517+
if EXTERNAL_IDS_ARRAY_NAME in group:
2518+
tiledb.Array.delete_array(group[EXTERNAL_IDS_ARRAY_NAME].uri)
2519+
except tiledb.TileDBError as err:
2520+
message = str(err)
2521+
if "does not exist" not in message:
2522+
raise err
2523+
modes = ["fragment_meta", "commits", "array_meta"]
2524+
for mode in modes:
2525+
conf = tiledb.Config(config)
2526+
conf["sm.consolidation.mode"] = mode
2527+
conf["sm.vacuum.mode"] = mode
2528+
ids_uri = group[IDS_ARRAY_NAME].uri
2529+
parts_uri = group[PARTS_ARRAY_NAME].uri
2530+
tiledb.consolidate(parts_uri, config=conf)
2531+
tiledb.vacuum(parts_uri, config=conf)
2532+
tiledb.consolidate(ids_uri, config=conf)
2533+
tiledb.vacuum(ids_uri, config=conf)
2534+
partial_write_array_exists = PARTIAL_WRITE_ARRAY_DIR in group
2535+
if partial_write_array_exists:
2536+
with tiledb.Group(index_group_uri, "w") as partial_write_array_group:
2537+
partial_write_array_group.remove(PARTIAL_WRITE_ARRAY_DIR)
2538+
partial_write_array_dir_uri = (
2539+
index_group_uri + "/" + PARTIAL_WRITE_ARRAY_DIR
2540+
)
2541+
with tiledb.Group(
2542+
partial_write_array_dir_uri, "m"
2543+
) as partial_write_array_group:
2544+
partial_write_array_group.delete(recursive=True)
25502545

25512546
# --------------------------------------------------------------------
25522547
# End internal function definitions

0 commit comments

Comments
 (0)