Skip to content

Commit 0d0c17f

Browse files
authored
Fix api_call actions running async in production, breaking book recommendations (#584)
In production (GCP), api_call actions were enqueued to Cloud Tasks and processed asynchronously. This caused the recommendation sub-flow to return without book results — the flow continued to "Happy reading!" without ever showing books. Three fixes: - Remove api_call from async_actions set in ActionNodeProcessor so book recommendation queries run synchronously within the chat flow - Respect wait_for_acknowledgment in composite sub-flow processing loop - Flatten message results and handle COMPOSITE node type in parent flow return processing to prevent nested message dicts in responses
1 parent 1e27a26 commit 0d0c17f

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

app/services/action_processor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,11 @@ async def process(
8585
node_content = node.content or {}
8686
actions = node_content.get("actions", [])
8787

88-
# Determine if this should be processed asynchronously
89-
async_actions = {"api_call", "external_service", "heavy_computation"}
88+
# Determine if this should be processed asynchronously.
89+
# Note: api_call is intentionally excluded — chat flows need synchronous
90+
# results (e.g. book recommendations) so the flow can continue immediately.
91+
# Use the explicit "async": true flag on individual actions if needed.
92+
async_actions = {"external_service", "heavy_computation"}
9093
should_async = any(
9194
action.get("type") in async_actions or action.get("async", False)
9295
for action in actions

app/services/chat_runtime.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,23 @@ async def _try_return_to_parent_flow(
16571657
else:
16581658
result["messages"].append(return_result)
16591659

1660+
# Pause if the return node (e.g. composite sub-flow) requests ack.
1661+
# Save the session position at the paused node in the sub-flow so
1662+
# that the MESSAGE branch of process_interaction can resume correctly.
1663+
if return_result.get("wait_for_acknowledgment"):
1664+
paused_node_id = return_result.get("node_id")
1665+
if paused_node_id:
1666+
session = await chat_repo.update_session_state(
1667+
db,
1668+
session_id=session.id,
1669+
state_updates={},
1670+
current_node_id=paused_node_id,
1671+
expected_revision=session.revision,
1672+
)
1673+
result["wait_for_acknowledgment"] = True
1674+
result["current_node_id"] = paused_node_id
1675+
return result
1676+
16601677
# Continue processing chain until question or end.
16611678
# Track the result that produced next_node so we can
16621679
# extract sub_flow_id from the correct source.
@@ -1688,12 +1705,24 @@ async def _try_return_to_parent_flow(
16881705
NodeType.MESSAGE,
16891706
NodeType.ACTION,
16901707
NodeType.CONDITION,
1708+
NodeType.COMPOSITE,
16911709
):
16921710
node_result = await self.process_node(db, next_node, session)
16931711
session = await self._refresh_session(db, session)
16941712
if node_result:
1695-
result["messages"].append(node_result)
1713+
if node_result.get("type") == "messages":
1714+
result["messages"].extend(
1715+
node_result.get("messages", [])
1716+
)
1717+
else:
1718+
result["messages"].append(node_result)
16961719
source_result = node_result
1720+
1721+
# Pause if the result requests acknowledgment
1722+
if node_result and node_result.get("wait_for_acknowledgment"):
1723+
result["wait_for_acknowledgment"] = True
1724+
break
1725+
16971726
next_node = (
16981727
node_result.get("next_node") if node_result else None
16991728
)

app/services/node_processors.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,17 @@ async def _process_subflow(
663663
messages = result.get("messages", [])
664664
all_messages.extend(messages)
665665

666+
# Pause processing if the message requests acknowledgment
667+
if result.get("wait_for_acknowledgment"):
668+
final_result = {
669+
"type": "messages",
670+
"messages": all_messages,
671+
"wait_for_acknowledgment": True,
672+
"node_id": result.get("node_id"),
673+
"next_node": result.get("next_node"),
674+
}
675+
break
676+
666677
# Check if there's a next node to process
667678
next_node = result.get("next_node")
668679
if next_node:

0 commit comments

Comments
 (0)