Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions app/services/flow_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,6 @@ async def materialize_snapshot(
if node_id:
new_node_ids.add(node_id)

# Delete nodes not in the new snapshot
if new_node_ids:
await session.execute(
delete(FlowNode).where(
FlowNode.flow_id == flow_id,
~FlowNode.node_id.in_(new_node_ids),
)
)
else:
# If no nodes in snapshot, delete all nodes for this flow
await session.execute(delete(FlowNode).where(FlowNode.flow_id == flow_id))

# Collect connection identifiers from the new snapshot
new_connections = set()
for conn in connections:
Expand All @@ -184,9 +172,8 @@ async def materialize_snapshot(
if source and target:
new_connections.add((source, target, ctype))

# Delete connections not in the new snapshot
# Delete connections before nodes (connections have FKs referencing nodes)
if new_connections:
# Get existing connections for this flow
result = await session.execute(
select(
FlowConnection.id,
Expand All @@ -206,11 +193,21 @@ async def materialize_snapshot(
delete(FlowConnection).where(FlowConnection.id.in_(ids_to_delete))
)
else:
# If no connections in snapshot, delete all connections for this flow
await session.execute(
delete(FlowConnection).where(FlowConnection.flow_id == flow_id)
)

# Delete nodes not in the new snapshot
if new_node_ids:
await session.execute(
delete(FlowNode).where(
FlowNode.flow_id == flow_id,
~FlowNode.node_id.in_(new_node_ids),
)
)
else:
await session.execute(delete(FlowNode).where(FlowNode.flow_id == flow_id))

await session.flush()

# Upsert nodes
Expand Down
Loading