From bf964b87667f070b3166d8344179906ece6dc5d1 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sat, 7 Feb 2026 14:49:58 +1300 Subject: [PATCH 1/2] Add wait_for_acknowledgment support and fix MESSAGE node chaining Allow MESSAGE nodes with `wait_for_ack: true` to pause flow processing and return accumulated messages to the client. The frontend can then auto-continue after a delay (e.g., showing a searching animation). Changes: - Add `wait_for_acknowledgment` field to InteractionResponse schema - Pass wait_for_acknowledgment through the chat API endpoint - Check for wait_for_acknowledgment in QUESTION branch while loop - Rewrite MESSAGE branch in process_interaction to properly chain through ACTION, CONDITION, MESSAGE, and QUESTION nodes when resuming from a paused message (continueFlow) - Fix double-processing bug: skip while loop when awaiting_input is already True, preventing CMS random questions from being fetched twice --- app/api/chat.py | 1 + app/schemas/cms.py | 1 + app/services/chat_runtime.py | 150 ++++++++++++++++++++++++++++++----- 3 files changed, 130 insertions(+), 22 deletions(-) diff --git a/app/api/chat.py b/app/api/chat.py index c158bd9a..e45fb38c 100644 --- a/app/api/chat.py +++ b/app/api/chat.py @@ -293,6 +293,7 @@ async def interact_with_session( session_ended=response.get("session_ended", False), current_node_id=response.get("current_node_id"), session_updated=response.get("session_updated"), + wait_for_acknowledgment=response.get("wait_for_acknowledgment", False), ) except IntegrityError: diff --git a/app/schemas/cms.py b/app/schemas/cms.py index 75fb32bd..ee19503e 100644 --- a/app/schemas/cms.py +++ b/app/schemas/cms.py @@ -475,6 +475,7 @@ class InteractionResponse(BaseModel): session_ended: bool = False current_node_id: Optional[str] = None session_updated: Optional[Dict[str, Any]] = None + wait_for_acknowledgment: bool = False class ConversationHistoryDetail(BaseModel): diff --git a/app/services/chat_runtime.py b/app/services/chat_runtime.py index 15a41867..e1b4d551 100644 --- a/app/services/chat_runtime.py +++ b/app/services/chat_runtime.py @@ -1331,8 +1331,9 @@ async def process_interaction( ) # Continue processing through non-interactive nodes (action, condition) - # until we hit a question node or the flow ends - while chained_next_node: + # until we hit a question node or the flow ends. + # Skip if awaiting_input is already True (question was handled above). + while chained_next_node and not awaiting_input: # If next node is a FlowNode if isinstance(chained_next_node, FlowNode): if chained_next_node.node_type == NodeType.QUESTION: @@ -1433,6 +1434,11 @@ async def process_interaction( current_flow_id=session_flow_id, expected_revision=session.revision, ) + + # Pause if message requests acknowledgment + if msg_result and msg_result.get("wait_for_acknowledgment"): + result["wait_for_acknowledgment"] = True + break else: # Unknown node type, stop processing break @@ -1449,7 +1455,7 @@ async def process_interaction( result["session_ended"] = True elif current_node.node_type == NodeType.MESSAGE: - # For message nodes, just continue to next + # Continue from a message node (e.g., after wait_for_acknowledgment) processor = MessageNodeProcessor(self) next_connection = await processor.get_next_connection(db, current_node) @@ -1463,38 +1469,138 @@ async def process_interaction( next_result = await self.process_node(db, next_node, session) session = await self._refresh_session(db, session) - # Extract messages properly from the result if next_result and next_result.get("type") == "messages": result["messages"] = next_result.get("messages", []) else: result["messages"] = [next_result] if next_result else [] - # Determine session position: if the processed node's next_node is a QUESTION, - # position the session at that QUESTION so user input goes there session_position = next_node.node_id + session_flow_id: Optional[UUID] = next_node.flow_id + awaiting_input = False chained_next = next_result.get("next_node") if next_result else None - if chained_next and isinstance(chained_next, FlowNode): + + # Check if the first processed node is a question + if next_node.node_type == NodeType.QUESTION: + awaiting_input = True + if next_result and next_result.get("type") == "question": + result["input_request"] = { + "input_type": next_result.get("input_type", "text"), + "variable": next_result.get("variable", ""), + "options": next_result.get("options", []), + "question": next_result.get("question", {}), + } + + # Chain through non-interactive nodes until question or end + while chained_next and not awaiting_input: + if not isinstance(chained_next, FlowNode): + break + if chained_next.node_type == NodeType.QUESTION: session_position = chained_next.node_id - # Set input_request for the chained question + session_flow_id = chained_next.flow_id + awaiting_input = True q_content = chained_next.content or {} - result["input_request"] = { - "input_type": q_content.get("input_type", "text"), - "variable": q_content.get("variable", ""), - "options": q_content.get("options", []), - "question": q_content.get("question", {}), - } + + if q_content.get("source") == "random": + session = await self._refresh_session(db, session) + q_result = await self.process_node( + db, chained_next, session + ) + session = await self._refresh_session(db, session) + result["input_request"] = { + "input_type": q_result.get("input_type", "text"), + "variable": q_result.get("variable", ""), + "options": q_result.get("options", []), + "question": q_result.get("question", {}), + } + else: + session = await self._refresh_session(db, session) + q_result = await self.process_node( + db, chained_next, session + ) + session = await self._refresh_session(db, session) + result["input_request"] = { + "input_type": q_result.get("input_type", "text"), + "variable": q_result.get("variable", ""), + "options": q_result.get("options", []), + "question": q_result.get("question", {}), + } + + q_options = result["input_request"].get("options", []) + q_state = {"system": {"_current_options": q_options}} + session = await self._refresh_session(db, session) + session = await chat_repo.update_session_state( + db, + session_id=session.id, + state_updates=q_state, + current_node_id=session_position, + current_flow_id=session_flow_id, + expected_revision=session.revision, + ) + break + + elif chained_next.node_type in ( + NodeType.ACTION, + NodeType.CONDITION, + ): + auto_result = await self.process_node( + db, chained_next, session + ) + session = await self._refresh_session(db, session) + if auto_result and auto_result.get("type") == "messages": + result["messages"].extend( + auto_result.get("messages", []) + ) + session_position = chained_next.node_id + session_flow_id = chained_next.flow_id + chained_next = auto_result.get("next_node") + + session = await chat_repo.update_session_state( + db, + session_id=session.id, + state_updates={}, + current_node_id=session_position, + current_flow_id=session_flow_id, + expected_revision=session.revision, + ) + + elif chained_next.node_type == NodeType.MESSAGE: + msg_result = await self.process_node( + db, chained_next, session + ) + session = await self._refresh_session(db, session) + if msg_result and msg_result.get("type") == "messages": + result["messages"].extend( + msg_result.get("messages", []) + ) + elif msg_result: + result["messages"].append(msg_result) + session_position = chained_next.node_id + session_flow_id = chained_next.flow_id + chained_next = ( + msg_result.get("next_node") if msg_result else None + ) + + session = await chat_repo.update_session_state( + db, + session_id=session.id, + state_updates={}, + current_node_id=session_position, + current_flow_id=session_flow_id, + expected_revision=session.revision, + ) + + if msg_result and msg_result.get("wait_for_acknowledgment"): + result["wait_for_acknowledgment"] = True + break + + else: + break result["current_node_id"] = session_position - # Update session's current node position - session = await chat_repo.update_session_state( - db, - session_id=session.id, - state_updates={}, # No state changes, just position update - current_node_id=session_position, - expected_revision=session.revision, - ) + if not chained_next and not awaiting_input: + result["session_ended"] = True else: result["session_ended"] = True else: From 09d318faa521f6b4b775921b58c7c4ec16c80eb6 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Sat, 7 Feb 2026 15:20:07 +1300 Subject: [PATCH 2/2] =?UTF-8?q?Fix=20MESSAGE=E2=86=92QUESTION=20session=20?= =?UTF-8?q?state=20persistence?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a MESSAGE node's next node is a QUESTION, persist current_node_id and _current_options via chat_repo.update_session_state so the session points at the correct QUESTION node for the next user response. Also handle the ACTION→QUESTION recursive chain case. --- app/services/chat_runtime.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/app/services/chat_runtime.py b/app/services/chat_runtime.py index e1b4d551..92c2a65e 100644 --- a/app/services/chat_runtime.py +++ b/app/services/chat_runtime.py @@ -1489,6 +1489,36 @@ async def process_interaction( "options": next_result.get("options", []), "question": next_result.get("question", {}), } + elif ( + isinstance(next_result, dict) + and next_result.get("type") == "question" + ): + # ACTION/CONDITION node auto-chained into a question + awaiting_input = True + session_position = next_result.get("node_id", session_position) + result["input_request"] = { + "input_type": next_result.get("input_type", "text"), + "variable": next_result.get("variable", ""), + "options": next_result.get("options", []), + "question": next_result.get("question", {}), + } + + # Persist session position and options when awaiting input + # (the chaining loop below handles its own persistence) + if awaiting_input: + options_to_store = result.get("input_request", {}).get( + "options", [] + ) + q_state = {"system": {"_current_options": options_to_store}} + session = await self._refresh_session(db, session) + session = await chat_repo.update_session_state( + db, + session_id=session.id, + state_updates=q_state, + current_node_id=session_position, + current_flow_id=session_flow_id, + expected_revision=session.revision, + ) # Chain through non-interactive nodes until question or end while chained_next and not awaiting_input: