Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions app/services/action_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ async def process(
node_content = node.content or {}
actions = node_content.get("actions", [])

# Determine if this should be processed asynchronously
async_actions = {"api_call", "external_service", "heavy_computation"}
# Determine if this should be processed asynchronously.
# Note: api_call is intentionally excluded — chat flows need synchronous
# results (e.g. book recommendations) so the flow can continue immediately.
# Use the explicit "async": true flag on individual actions if needed.
async_actions = {"external_service", "heavy_computation"}
should_async = any(
action.get("type") in async_actions or action.get("async", False)
for action in actions
Expand Down
31 changes: 30 additions & 1 deletion app/services/chat_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,23 @@ async def _try_return_to_parent_flow(
else:
result["messages"].append(return_result)

# Pause if the return node (e.g. composite sub-flow) requests ack.
# Save the session position at the paused node in the sub-flow so
# that the MESSAGE branch of process_interaction can resume correctly.
if return_result.get("wait_for_acknowledgment"):
paused_node_id = return_result.get("node_id")
if paused_node_id:
session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates={},
current_node_id=paused_node_id,
expected_revision=session.revision,
)
result["wait_for_acknowledgment"] = True
result["current_node_id"] = paused_node_id
return result

# Continue processing chain until question or end.
# Track the result that produced next_node so we can
# extract sub_flow_id from the correct source.
Expand Down Expand Up @@ -1688,12 +1705,24 @@ async def _try_return_to_parent_flow(
NodeType.MESSAGE,
NodeType.ACTION,
NodeType.CONDITION,
NodeType.COMPOSITE,
):
node_result = await self.process_node(db, next_node, session)
session = await self._refresh_session(db, session)
if node_result:
result["messages"].append(node_result)
if node_result.get("type") == "messages":
result["messages"].extend(
node_result.get("messages", [])
)
else:
result["messages"].append(node_result)
source_result = node_result

# Pause if the result requests acknowledgment
if node_result and node_result.get("wait_for_acknowledgment"):
result["wait_for_acknowledgment"] = True
break

next_node = (
node_result.get("next_node") if node_result else None
)
Expand Down
11 changes: 11 additions & 0 deletions app/services/node_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,17 @@ async def _process_subflow(
messages = result.get("messages", [])
all_messages.extend(messages)

# Pause processing if the message requests acknowledgment
if result.get("wait_for_acknowledgment"):
final_result = {
"type": "messages",
"messages": all_messages,
"wait_for_acknowledgment": True,
"node_id": result.get("node_id"),
"next_node": result.get("next_node"),
}
break

# Check if there's a next node to process
next_node = result.get("next_node")
if next_node:
Expand Down
Loading