Skip to content

Commit 6f56cf5

Browse files
authored
Fix Node Deletion (#1283)
* FIX: refcator the delete method * FIX: refactor SQL * MNT: init alembic migration scripts * TST: fix tests * ENH: migration scripts * ENH: remove rebuilding of foreign key constraints for sqlite * FIX: Skip changing the constraint for SQLite * MNT: fmt and lint * MNT: update changelog * TST: revert comment * TST: revert comment * TST: revert comment * MNT: remove breakpoint * FIX: drop index in alembic migration * FIX: condition to delete storage assets * FIX: remove unnecessary index
1 parent b5b4040 commit 6f56cf5

File tree

6 files changed

+385
-84
lines changed

6 files changed

+385
-84
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ Write the date in place of the "Unreleased" in the case a new version is release
2222

2323
- Error handling in `tiled.client.download._download_url`.
2424

25+
### Fixed
26+
27+
- Slow performance when deleting nodes in large catalogs with many nodes.
28+
2529
## v0.2.3 (2025-12-17)
2630

2731
### Added

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def poll_enumerate():
140140
# docker run --name tiled-test-postgres -p 5432:5432 -e POSTGRES_PASSWORD=secret -d docker.io/postgres:16
141141
# and set this env var like:
142142
#
143-
# TILED_TEST_POSTGRESQL_URI=postgresql+asyncpg://postgres:secret@localhost:5432
143+
# TILED_TEST_POSTGRESQL_URI=postgresql://postgres:secret@localhost:5432
144144

145145
TILED_TEST_POSTGRESQL_URI = os.getenv("TILED_TEST_POSTGRESQL_URI")
146146

tiled/catalog/adapter.py

Lines changed: 182 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import anyio
2020
from fastapi import HTTPException
2121
from sqlalchemy import (
22-
and_,
2322
delete,
2423
exists,
2524
false,
@@ -902,18 +901,39 @@ async def revisions(self, offset, limit):
902901
return [Revision.from_orm(o[0]) for o in revision_orms]
903902

904903
async def delete(self, recursive=False, external_only=True):
905-
"""Delete the Node.
904+
"""Delete the Node, its descendants, and associated DataSources and Assets
906905
907906
Any DataSources belonging to this Node and any Assets associated (only) with
908-
those DataSources will also be deleted.
907+
those DataSources will also be deleted. Assets shared with other Nodes will
908+
be retained.
909909
910910
If `recursive` is True, delete all Nodes beneath this Node in the tree.
911+
912+
If `external_only` is True, refuse to delete if any internally-managed
913+
data sources are present in the subtree.
914+
915+
Parameters
916+
----------
917+
recursive : bool, optional
918+
Safety check: if False, refuse to delete if the Node has any children.
919+
external_only : bool, optional
920+
Safety check: if True, refuse to delete if any internally-managed data
921+
sources are present in the subtree.
922+
923+
Returns
924+
-------
925+
int
926+
The number of Asset records deleted.
911927
"""
928+
912929
async with self.context.session() as db:
930+
# Safety check: non-recursive delete must have no children
913931
if not recursive:
914932
has_children_stmt = select(
915-
exists().where(
916-
and_(
933+
exists(
934+
select(1)
935+
.select_from(orm.NodesClosure)
936+
.where(
917937
orm.NodesClosure.ancestor == self.node.id,
918938
orm.NodesClosure.descendant != self.node.id,
919939
)
@@ -925,88 +945,176 @@ async def delete(self, recursive=False, external_only=True):
925945
"Delete its contents first or pass `recursive=True`."
926946
)
927947

928-
affected_nodes_stmnt = (
948+
# Affected nodes CTE
949+
affected_nodes_cte = (
929950
select(orm.NodesClosure.descendant)
930951
.where(orm.NodesClosure.ancestor == self.node.id)
931-
.distinct()
932-
.scalar_subquery()
952+
.cte("affected_nodes")
933953
)
954+
955+
# Safety check: refuse to delete any internally-managed data sources
934956
if external_only:
935957
int_asset_exists_stmt = select(
936-
exists()
937-
.where(orm.Asset.id == orm.DataSourceAssetAssociation.asset_id)
938-
.where(
939-
orm.DataSourceAssetAssociation.data_source_id
940-
== orm.DataSource.id
958+
exists(
959+
select(1)
960+
.select_from(orm.DataSource)
961+
.where(
962+
orm.DataSource.node_id.in_(
963+
select(affected_nodes_cte.c.descendant)
964+
),
965+
orm.DataSource.management != Management.external,
966+
)
941967
)
942-
.where(orm.DataSource.node_id.in_(affected_nodes_stmnt))
943-
.where(orm.DataSource.management != Management.external)
944968
)
945-
946969
if (await db.execute(int_asset_exists_stmt)).scalar():
947970
raise WouldDeleteData(
948971
"Some items in this tree are internally managed. "
949972
"Deleting the records will also delete the underlying data files. "
950973
"If you want to delete them, pass external_only=False."
951974
)
952975

953-
sel_asset_stmnt = (
976+
# Find storage database entries that can be deleted (without deleting the
977+
# entire storage DB asset)
978+
deleted_from_storage_stmt = (
954979
select(
955980
orm.Asset.id,
956981
orm.Asset.data_uri,
957-
orm.Asset.is_directory,
958-
orm.DataSource.management,
959-
orm.DataSource.parameters,
982+
# keep only table_name and dataset_id from parameters
983+
orm.DataSource.parameters["table_name"].label("table_name"),
984+
orm.DataSource.parameters["dataset_id"].label("dataset_id"),
960985
)
961-
.select_from(orm.Asset)
986+
.select_from(orm.DataSourceAssetAssociation)
962987
.join(
963-
orm.Asset.data_sources
964-
) # Join on secondary (mapping) relationship
965-
.join(orm.DataSource.node)
966-
.filter(orm.Node.id.in_(affected_nodes_stmnt))
988+
orm.DataSource,
989+
orm.DataSource.id == orm.DataSourceAssetAssociation.data_source_id,
990+
)
991+
.join(
992+
orm.NodesClosure,
993+
orm.NodesClosure.descendant == orm.DataSource.node_id,
994+
)
995+
.join(
996+
orm.Asset,
997+
orm.Asset.id == orm.DataSourceAssetAssociation.asset_id,
998+
)
999+
.where(orm.NodesClosure.ancestor == self.node.id)
1000+
# Select only data sources related to storage DBs
1001+
.where(
1002+
or_(
1003+
orm.Asset.data_uri.startswith("postgresql"),
1004+
orm.Asset.data_uri.startswith("sqlite"),
1005+
orm.Asset.data_uri.startswith("duckdb"),
1006+
)
1007+
)
9671008
.distinct()
9681009
)
969-
970-
assets_to_delete = []
971-
for asset_id, data_uri, is_directory, management, parameters in (
972-
await db.execute(sel_asset_stmnt)
973-
).all():
974-
# Check if this asset is referenced by other UNAFFECTED nodes
975-
is_referenced = select(
976-
exists()
1010+
deleted_from_storage = (await db.execute(deleted_from_storage_stmt)).all()
1011+
1012+
# Delete Asset records from the table: This will delete all assets that are
1013+
# referenced in the subtree, but only if they are not referenced elsewhere.
1014+
if self.context.engine.dialect.name == "postgresql":
1015+
delete_assets_stmt = text(
1016+
"""
1017+
WITH deletable_assets AS (
1018+
SELECT DISTINCT
1019+
daa.asset_id AS id,
1020+
ds.management AS management
1021+
FROM data_source_asset_association daa
1022+
JOIN data_sources ds ON ds.id = daa.data_source_id
1023+
JOIN nodes_closure nc ON nc.descendant = ds.node_id
1024+
WHERE nc.ancestor = :node_id
1025+
AND NOT EXISTS (
1026+
SELECT 1
1027+
FROM data_source_asset_association daa2
1028+
JOIN data_sources ds2 ON ds2.id = daa2.data_source_id
1029+
WHERE daa2.asset_id = daa.asset_id
1030+
AND ds2.node_id NOT IN (
1031+
SELECT descendant
1032+
FROM nodes_closure
1033+
WHERE ancestor = :node_id
1034+
)
1035+
)
1036+
)
1037+
DELETE FROM assets
1038+
USING deletable_assets AS da
1039+
WHERE assets.id = da.id
1040+
RETURNING assets.id, assets.data_uri, assets.is_directory, da.management;
1041+
"""
1042+
)
1043+
deleted_asset_records = (
1044+
await db.execute(delete_assets_stmt.params(node_id=self.node.id))
1045+
).all()
1046+
else:
1047+
deletable_assets_stmt = (
1048+
select(
1049+
orm.Asset.id,
1050+
orm.Asset.data_uri,
1051+
orm.Asset.is_directory,
1052+
orm.DataSource.management,
1053+
)
1054+
.select_from(orm.DataSourceAssetAssociation)
1055+
.join(
1056+
orm.Asset,
1057+
orm.Asset.id == orm.DataSourceAssetAssociation.asset_id,
1058+
)
1059+
.join(
1060+
orm.DataSource,
1061+
orm.DataSource.id
1062+
== orm.DataSourceAssetAssociation.data_source_id,
1063+
)
9771064
.where(
978-
orm.Asset.id == asset_id,
979-
orm.Asset.data_sources.any(
980-
orm.DataSource.node_id.notin_(affected_nodes_stmnt)
981-
),
1065+
# Asset is referenced by at least one data source in the affected nodes
1066+
orm.DataSource.node_id.in_(
1067+
select(affected_nodes_cte.c.descendant)
1068+
)
9821069
)
983-
.distinct()
1070+
.where(
1071+
# Asset is NOT referenced by any data source outside the affected nodes
1072+
~exists(
1073+
select(1)
1074+
.select_from(orm.DataSourceAssetAssociation)
1075+
.join(orm.DataSource)
1076+
.where(
1077+
orm.DataSourceAssetAssociation.asset_id == orm.Asset.id,
1078+
orm.DataSource.node_id.notin_(
1079+
select(affected_nodes_cte.c.descendant)
1080+
),
1081+
)
1082+
# Treat Asset as coming from the outer query
1083+
.correlate(orm.Asset)
1084+
)
1085+
)
1086+
.distinct() # may be needed if multiple data_sources point to the same asset
9841087
)
985-
if not (await db.execute(is_referenced)).scalar():
986-
# This asset is referenced only by AFFECTED nodes, so we can delete it
987-
await db.execute(delete(orm.Asset).where(orm.Asset.id == asset_id))
988-
if management != Management.external:
989-
assets_to_delete.append((data_uri, is_directory, parameters))
990-
elif (management == Management.writable) and (
991-
urlparse(data_uri).scheme in {"duckdb", "sqlite", "postgresql"}
992-
):
993-
# The tabular storage asset may be referenced by several data_sources
994-
# and nodes, so we cannot delete it completely. However, we can delete
995-
# the relevant rows and tables.
996-
assets_to_delete.append((data_uri, is_directory, parameters))
9971088

998-
result = await db.execute(
1089+
# Gather asset records before deleting them, recombine parameters
1090+
deleted_asset_records = (await db.execute(deletable_assets_stmt)).all()
1091+
1092+
# Now delete the gathered assets, if any
1093+
if asset_ids := set(record[0] for record in deleted_asset_records):
1094+
await db.execute(
1095+
delete(orm.Asset).where(orm.Asset.id.in_(asset_ids))
1096+
)
1097+
1098+
# Delete Nodes (deletes all descendants and closure entries via cascade)
1099+
await db.execute(
9991100
delete(orm.Node)
1000-
.where(orm.Node.id.in_(affected_nodes_stmnt))
1101+
.where(orm.Node.id.in_(select(affected_nodes_cte.c.descendant)))
10011102
.where(orm.Node.parent.isnot(None))
10021103
)
10031104
await db.commit()
10041105

1005-
# Finally, delete the physical assets that are not externally managed
1006-
for data_uri, is_directory, parameters in assets_to_delete:
1007-
delete_asset(data_uri, is_directory, parameters=parameters)
1106+
# Physical deletion -- outside database transaction
1107+
# Delete assets backed by files and blobs written by Tiled
1108+
for asset_id, data_uri, is_directory, management in deleted_asset_records:
1109+
if management != Management.external:
1110+
delete_physical_asset(data_uri, is_directory=is_directory)
1111+
# Delete storage database entries (management == writable, always)
1112+
for asset_id, data_uri, table_name, dataset_id in deleted_from_storage:
1113+
delete_physical_asset(
1114+
data_uri, table_name=table_name, dataset_id=dataset_id
1115+
)
10081116

1009-
return result.rowcount
1117+
return len(set(record[0] for record in deleted_asset_records))
10101118

10111119
async def delete_revision(self, number):
10121120
async with self.context.session() as db:
@@ -1349,7 +1457,9 @@ async def append_partition(self, media_type, deserializer, entry, body, partitio
13491457
)
13501458

13511459

1352-
def delete_asset(data_uri, is_directory, parameters=None):
1460+
def delete_physical_asset(
1461+
data_uri, is_directory=False, table_name=None, dataset_id=None
1462+
):
13531463
url = urlparse(data_uri)
13541464
if url.scheme == "file":
13551465
path = path_from_uri(data_uri)
@@ -1358,22 +1468,21 @@ def delete_asset(data_uri, is_directory, parameters=None):
13581468
else:
13591469
Path(path).unlink()
13601470
elif url.scheme in {"duckdb", "sqlite", "postgresql"}:
1361-
storage = cast(SQLStorage, get_storage(data_uri))
1362-
with closing(storage.connect()) as conn:
1363-
table_name = parameters.get("table_name") if parameters else None
1364-
dataset_id = parameters.get("dataset_id") if parameters else None
1365-
with conn.cursor() as cursor:
1366-
cursor.execute(
1367-
f'DELETE FROM "{table_name}" WHERE _dataset_id = {dataset_id:d};',
1368-
)
1369-
conn.commit()
1370-
1371-
# If the table is empty, we can drop it
1372-
with conn.cursor() as cursor:
1373-
cursor.execute(f'SELECT COUNT(*) FROM "{table_name}";')
1374-
if cursor.fetchone()[0] == 0:
1375-
cursor.execute(f'DROP TABLE IF EXISTS "{table_name}";')
1376-
conn.commit()
1471+
if (table_name is not None) and (dataset_id is not None):
1472+
storage = cast(SQLStorage, get_storage(data_uri))
1473+
with closing(storage.connect()) as conn:
1474+
with conn.cursor() as cursor:
1475+
cursor.execute(
1476+
f'DELETE FROM "{table_name}" WHERE _dataset_id = {dataset_id:d};',
1477+
)
1478+
conn.commit()
1479+
1480+
# If the table is empty, we can drop it
1481+
with conn.cursor() as cursor:
1482+
cursor.execute(f'SELECT COUNT(*) FROM "{table_name}";')
1483+
if cursor.fetchone()[0] == 0:
1484+
cursor.execute(f'DROP TABLE IF EXISTS "{table_name}";')
1485+
conn.commit()
13771486

13781487
elif url.scheme in SUPPORTED_OBJECT_URI_SCHEMES:
13791488
storage = cast(ObjectStorage, get_storage(data_uri))

tiled/catalog/core.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
# This is list of all valid revisions (from current to oldest).
88
ALL_REVISIONS = [
9+
"a86a48befff6",
910
"dfbb7478c6bd",
1011
"a963a6c32a0c",
1112
"e05e918092c3",

0 commit comments

Comments
 (0)