diff --git a/app/services/action_processor.py b/app/services/action_processor.py index 1527f353..c3818474 100644 --- a/app/services/action_processor.py +++ b/app/services/action_processor.py @@ -85,8 +85,11 @@ async def process( node_content = node.content or {} actions = node_content.get("actions", []) - # Determine if this should be processed asynchronously - async_actions = {"api_call", "external_service", "heavy_computation"} + # Determine if this should be processed asynchronously. + # Note: api_call is intentionally excluded — chat flows need synchronous + # results (e.g. book recommendations) so the flow can continue immediately. + # Use the explicit "async": true flag on individual actions if needed. + async_actions = {"external_service", "heavy_computation"} should_async = any( action.get("type") in async_actions or action.get("async", False) for action in actions diff --git a/app/services/chat_runtime.py b/app/services/chat_runtime.py index ea90e000..0703ff88 100644 --- a/app/services/chat_runtime.py +++ b/app/services/chat_runtime.py @@ -1657,6 +1657,23 @@ async def _try_return_to_parent_flow( else: result["messages"].append(return_result) + # Pause if the return node (e.g. composite sub-flow) requests ack. + # Save the session position at the paused node in the sub-flow so + # that the MESSAGE branch of process_interaction can resume correctly. + if return_result.get("wait_for_acknowledgment"): + paused_node_id = return_result.get("node_id") + if paused_node_id: + session = await chat_repo.update_session_state( + db, + session_id=session.id, + state_updates={}, + current_node_id=paused_node_id, + expected_revision=session.revision, + ) + result["wait_for_acknowledgment"] = True + result["current_node_id"] = paused_node_id + return result + # Continue processing chain until question or end. # Track the result that produced next_node so we can # extract sub_flow_id from the correct source. @@ -1688,12 +1705,24 @@ async def _try_return_to_parent_flow( NodeType.MESSAGE, NodeType.ACTION, NodeType.CONDITION, + NodeType.COMPOSITE, ): node_result = await self.process_node(db, next_node, session) session = await self._refresh_session(db, session) if node_result: - result["messages"].append(node_result) + if node_result.get("type") == "messages": + result["messages"].extend( + node_result.get("messages", []) + ) + else: + result["messages"].append(node_result) source_result = node_result + + # Pause if the result requests acknowledgment + if node_result and node_result.get("wait_for_acknowledgment"): + result["wait_for_acknowledgment"] = True + break + next_node = ( node_result.get("next_node") if node_result else None ) diff --git a/app/services/node_processors.py b/app/services/node_processors.py index 58d682c4..8bd49d2b 100644 --- a/app/services/node_processors.py +++ b/app/services/node_processors.py @@ -663,6 +663,17 @@ async def _process_subflow( messages = result.get("messages", []) all_messages.extend(messages) + # Pause processing if the message requests acknowledgment + if result.get("wait_for_acknowledgment"): + final_result = { + "type": "messages", + "messages": all_messages, + "wait_for_acknowledgment": True, + "node_id": result.get("node_id"), + "next_node": result.get("next_node"), + } + break + # Check if there's a next node to process next_node = result.get("next_node") if next_node: