Skip to content

Commit 1e27a26

Browse files
authored
Update deploy script for composite sub-flows (#583)
* Update deploy script for composite sub-flows, fix materialize_snapshot FK order - deploy_huey_flow.py: Deploy 3 sub-flows (profile, preferences, recommendation) before the main flow, resolve composite_flow_seed_key references to production UUIDs, add --skip-subflows flag - flow_snapshot.py: Delete connections before nodes in materialize_snapshot to respect FK constraints (connections reference nodes) * Abort deploy when composite seed keys cannot be resolved Unresolved composite_flow_seed_key references produce broken flows at runtime. Fail fast instead of deploying silently broken state.
1 parent 7ff67e5 commit 1e27a26

File tree

2 files changed

+475
-15
lines changed

2 files changed

+475
-15
lines changed

app/services/flow_snapshot.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,18 +156,6 @@ async def materialize_snapshot(
156156
if node_id:
157157
new_node_ids.add(node_id)
158158

159-
# Delete nodes not in the new snapshot
160-
if new_node_ids:
161-
await session.execute(
162-
delete(FlowNode).where(
163-
FlowNode.flow_id == flow_id,
164-
~FlowNode.node_id.in_(new_node_ids),
165-
)
166-
)
167-
else:
168-
# If no nodes in snapshot, delete all nodes for this flow
169-
await session.execute(delete(FlowNode).where(FlowNode.flow_id == flow_id))
170-
171159
# Collect connection identifiers from the new snapshot
172160
new_connections = set()
173161
for conn in connections:
@@ -184,9 +172,8 @@ async def materialize_snapshot(
184172
if source and target:
185173
new_connections.add((source, target, ctype))
186174

187-
# Delete connections not in the new snapshot
175+
# Delete connections before nodes (connections have FKs referencing nodes)
188176
if new_connections:
189-
# Get existing connections for this flow
190177
result = await session.execute(
191178
select(
192179
FlowConnection.id,
@@ -206,11 +193,21 @@ async def materialize_snapshot(
206193
delete(FlowConnection).where(FlowConnection.id.in_(ids_to_delete))
207194
)
208195
else:
209-
# If no connections in snapshot, delete all connections for this flow
210196
await session.execute(
211197
delete(FlowConnection).where(FlowConnection.flow_id == flow_id)
212198
)
213199

200+
# Delete nodes not in the new snapshot
201+
if new_node_ids:
202+
await session.execute(
203+
delete(FlowNode).where(
204+
FlowNode.flow_id == flow_id,
205+
~FlowNode.node_id.in_(new_node_ids),
206+
)
207+
)
208+
else:
209+
await session.execute(delete(FlowNode).where(FlowNode.flow_id == flow_id))
210+
214211
await session.flush()
215212

216213
# Upsert nodes

0 commit comments

Comments
 (0)