Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ async def interact_with_session(
session_ended=response.get("session_ended", False),
current_node_id=response.get("current_node_id"),
session_updated=response.get("session_updated"),
wait_for_acknowledgment=response.get("wait_for_acknowledgment", False),
)

except IntegrityError:
Expand Down
1 change: 1 addition & 0 deletions app/schemas/cms.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class InteractionResponse(BaseModel):
session_ended: bool = False
current_node_id: Optional[str] = None
session_updated: Optional[Dict[str, Any]] = None
wait_for_acknowledgment: bool = False


class ConversationHistoryDetail(BaseModel):
Expand Down
180 changes: 158 additions & 22 deletions app/services/chat_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1331,8 +1331,9 @@ async def process_interaction(
)

# Continue processing through non-interactive nodes (action, condition)
# until we hit a question node or the flow ends
while chained_next_node:
# until we hit a question node or the flow ends.
# Skip if awaiting_input is already True (question was handled above).
while chained_next_node and not awaiting_input:
# If next node is a FlowNode
if isinstance(chained_next_node, FlowNode):
if chained_next_node.node_type == NodeType.QUESTION:
Expand Down Expand Up @@ -1433,6 +1434,11 @@ async def process_interaction(
current_flow_id=session_flow_id,
expected_revision=session.revision,
)

# Pause if message requests acknowledgment
if msg_result and msg_result.get("wait_for_acknowledgment"):
result["wait_for_acknowledgment"] = True
break
else:
# Unknown node type, stop processing
break
Expand All @@ -1449,7 +1455,7 @@ async def process_interaction(
result["session_ended"] = True

elif current_node.node_type == NodeType.MESSAGE:
# For message nodes, just continue to next
# Continue from a message node (e.g., after wait_for_acknowledgment)
processor = MessageNodeProcessor(self)
next_connection = await processor.get_next_connection(db, current_node)

Expand All @@ -1463,38 +1469,168 @@ async def process_interaction(
next_result = await self.process_node(db, next_node, session)
session = await self._refresh_session(db, session)

# Extract messages properly from the result
if next_result and next_result.get("type") == "messages":
result["messages"] = next_result.get("messages", [])
else:
result["messages"] = [next_result] if next_result else []

# Determine session position: if the processed node's next_node is a QUESTION,
# position the session at that QUESTION so user input goes there
session_position = next_node.node_id
session_flow_id: Optional[UUID] = next_node.flow_id
awaiting_input = False
chained_next = next_result.get("next_node") if next_result else None
if chained_next and isinstance(chained_next, FlowNode):

# Check if the first processed node is a question
if next_node.node_type == NodeType.QUESTION:
awaiting_input = True
if next_result and next_result.get("type") == "question":
result["input_request"] = {
"input_type": next_result.get("input_type", "text"),
"variable": next_result.get("variable", ""),
"options": next_result.get("options", []),
"question": next_result.get("question", {}),
}
elif (
isinstance(next_result, dict)
and next_result.get("type") == "question"
):
# ACTION/CONDITION node auto-chained into a question
awaiting_input = True
session_position = next_result.get("node_id", session_position)
result["input_request"] = {
"input_type": next_result.get("input_type", "text"),
"variable": next_result.get("variable", ""),
"options": next_result.get("options", []),
"question": next_result.get("question", {}),
}

# Persist session position and options when awaiting input
# (the chaining loop below handles its own persistence)
if awaiting_input:
options_to_store = result.get("input_request", {}).get(
"options", []
)
q_state = {"system": {"_current_options": options_to_store}}
session = await self._refresh_session(db, session)
session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates=q_state,
current_node_id=session_position,
current_flow_id=session_flow_id,
expected_revision=session.revision,
)

# Chain through non-interactive nodes until question or end
while chained_next and not awaiting_input:
if not isinstance(chained_next, FlowNode):
break

if chained_next.node_type == NodeType.QUESTION:
session_position = chained_next.node_id
# Set input_request for the chained question
session_flow_id = chained_next.flow_id
awaiting_input = True
q_content = chained_next.content or {}
result["input_request"] = {
"input_type": q_content.get("input_type", "text"),
"variable": q_content.get("variable", ""),
"options": q_content.get("options", []),
"question": q_content.get("question", {}),
}

if q_content.get("source") == "random":
session = await self._refresh_session(db, session)
q_result = await self.process_node(
db, chained_next, session
)
session = await self._refresh_session(db, session)
result["input_request"] = {
"input_type": q_result.get("input_type", "text"),
"variable": q_result.get("variable", ""),
"options": q_result.get("options", []),
"question": q_result.get("question", {}),
}
else:
session = await self._refresh_session(db, session)
q_result = await self.process_node(
db, chained_next, session
)
session = await self._refresh_session(db, session)
result["input_request"] = {
"input_type": q_result.get("input_type", "text"),
"variable": q_result.get("variable", ""),
"options": q_result.get("options", []),
"question": q_result.get("question", {}),
}

q_options = result["input_request"].get("options", [])
q_state = {"system": {"_current_options": q_options}}
session = await self._refresh_session(db, session)
session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates=q_state,
current_node_id=session_position,
current_flow_id=session_flow_id,
expected_revision=session.revision,
)
break

elif chained_next.node_type in (
NodeType.ACTION,
NodeType.CONDITION,
):
auto_result = await self.process_node(
db, chained_next, session
)
session = await self._refresh_session(db, session)
if auto_result and auto_result.get("type") == "messages":
result["messages"].extend(
auto_result.get("messages", [])
)
session_position = chained_next.node_id
session_flow_id = chained_next.flow_id
chained_next = auto_result.get("next_node")

session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates={},
current_node_id=session_position,
current_flow_id=session_flow_id,
expected_revision=session.revision,
)

elif chained_next.node_type == NodeType.MESSAGE:
msg_result = await self.process_node(
db, chained_next, session
)
session = await self._refresh_session(db, session)
if msg_result and msg_result.get("type") == "messages":
result["messages"].extend(
msg_result.get("messages", [])
)
elif msg_result:
result["messages"].append(msg_result)
session_position = chained_next.node_id
session_flow_id = chained_next.flow_id
chained_next = (
msg_result.get("next_node") if msg_result else None
)

session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates={},
current_node_id=session_position,
current_flow_id=session_flow_id,
expected_revision=session.revision,
)

if msg_result and msg_result.get("wait_for_acknowledgment"):
result["wait_for_acknowledgment"] = True
break

else:
break

result["current_node_id"] = session_position

# Update session's current node position
session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates={}, # No state changes, just position update
current_node_id=session_position,
expected_revision=session.revision,
)
if not chained_next and not awaiting_input:
result["session_ended"] = True
else:
result["session_ended"] = True
else:
Expand Down
Loading