Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ archive/
# Temporary debug/test scripts
debug_*.py
test_*.py
!app/tests/**/test_*.py
fix_*.py
activate_*.py
get_*_token.py
Expand All @@ -53,6 +54,7 @@ e2e_tests/
*.json
!pyproject.toml
!package.json
!scripts/fixtures/*.json
landbot_extraction_output/

# Credentials (should never be committed)
Expand Down
40 changes: 37 additions & 3 deletions app/services/action_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import datetime
import re
from typing import Any, Dict

from sqlalchemy.ext.asyncio import AsyncSession
Expand All @@ -29,6 +30,25 @@
logger = get_logger()


_TEMPLATE_RE = re.compile(r"\{\{.*?\}\}")


def _strip_unresolved_templates(obj: Any) -> Any:
"""Replace unresolved ``{{...}}`` template strings with ``None``.

Only matches actual template syntax (``{{var}}``) — stray ``{{`` or ``}}``
in isolation are left untouched. Recursively processes dicts and lists.
"""
if isinstance(obj, str) and _TEMPLATE_RE.search(obj):
logger.debug("Stripping unresolved template", value=obj)
return None
elif isinstance(obj, dict):
return {k: _strip_unresolved_templates(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [_strip_unresolved_templates(v) for v in obj]
return obj


def _extract_nested(data: Dict[str, Any], key_path: str) -> Any:
"""Extract a value from a nested dict using dot notation."""
keys = key_path.split(".")
Expand Down Expand Up @@ -319,13 +339,27 @@ async def _handle_api_call(
resolved_body = self.runtime.substitute_object(
api_config_data.get("body", {}), state
)
resolved_body = _strip_unresolved_templates(resolved_body)
resolved_params = self.runtime.substitute_object(
api_config_data.get("query_params", {}), state
)
resolved_params = _strip_unresolved_templates(resolved_params)

result_data = await INTERNAL_HANDLERS[endpoint](
db, resolved_body, resolved_params
)
try:
result_data = await INTERNAL_HANDLERS[endpoint](
db, resolved_body, resolved_params
)
except Exception:
logger.error(
"Internal handler failed",
endpoint=endpoint,
exc_info=True,
)
fallback = api_config_data.get("fallback_response")
if fallback is not None:
result_data = fallback
else:
raise

response_mapping = api_config_data.get("response_mapping", {})
for response_path, variable_name in response_mapping.items():
Expand Down
73 changes: 59 additions & 14 deletions app/services/chat_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ class FlowNotFoundError(Exception):
pass


def _to_uuid(value: Any) -> Optional[UUID]:
"""Convert a value to UUID, accepting both str and UUID inputs."""
if value is None:
return None
return UUID(value) if isinstance(value, str) else value


def sanitize_user_input(user_input: str) -> str:
"""Sanitize user input to prevent XSS attacks.

Expand Down Expand Up @@ -1212,7 +1219,7 @@ async def process_interaction(
# If the processed node (e.g., message) has a next_node that's a question,
# position the session at that question node so user input goes there
session_position = response["next_node"].node_id
session_flow_id: Optional[UUID] = None
session_flow_id: Optional[UUID] = response["next_node"].flow_id
chained_next_node = (
next_result.get("next_node") if next_result else None
)
Expand All @@ -1230,9 +1237,13 @@ async def process_interaction(
isinstance(next_result, dict)
and next_result.get("type") == "question"
):
# Action/condition node auto-chained into a question
# Action/condition/composite node auto-chained into a question
awaiting_input = True
session_position = next_result.get("node_id", session_position)
# Preserve sub-flow context when a composite node returns a question directly
sub_flow_id = _to_uuid(next_result.get("sub_flow_id"))
if sub_flow_id:
session_flow_id = sub_flow_id
result["input_request"] = self._build_input_request(next_result)
if chained_next_node:
# Handle FlowNode objects
Expand All @@ -1255,13 +1266,9 @@ async def process_interaction(
if node_id:
session_position = node_id
# Get sub_flow_id from the parent result (composite node)
sub_flow_id = next_result.get("sub_flow_id")
sub_flow_id = _to_uuid(next_result.get("sub_flow_id"))
if sub_flow_id:
session_flow_id = (
UUID(sub_flow_id)
if isinstance(sub_flow_id, str)
else sub_flow_id
)
session_flow_id = sub_flow_id
awaiting_input = True
result["input_request"] = self._build_input_request(
chained_next_node
Expand Down Expand Up @@ -1644,21 +1651,32 @@ async def _try_return_to_parent_flow(
session = await self._refresh_session(db, session)

if return_result:
# Extract actual messages from the result
if return_result.get("type") == "messages":
result["messages"].append(return_result)
result["messages"].extend(return_result.get("messages", []))
else:
result["messages"].append(return_result)

# Continue processing chain until question or end
# Continue processing chain until question or end.
# Track the result that produced next_node so we can
# extract sub_flow_id from the correct source.
next_node = return_result.get("next_node")
source_result = return_result
while next_node:
if isinstance(next_node, FlowNode):
if next_node.node_type == NodeType.QUESTION:
# Update session position to question
# Build input_request and persist options
(
result["input_request"],
q_options,
session,
) = await self._resolve_question_node(db, next_node, session)
q_state = {"system": {"_current_options": q_options}}
session = await self._refresh_session(db, session)
session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates={},
state_updates=q_state,
current_node_id=next_node.node_id,
current_flow_id=next_node.flow_id,
expected_revision=session.revision,
Expand All @@ -1675,19 +1693,46 @@ async def _try_return_to_parent_flow(
session = await self._refresh_session(db, session)
if node_result:
result["messages"].append(node_result)
source_result = node_result
next_node = (
node_result.get("next_node") if node_result else None
)
else:
break
elif (
isinstance(next_node, dict) and next_node.get("type") == "question"
):
# Dict question result from composite node sub-flow
node_id = next_node.get("node_id")
flow_id = _to_uuid(
source_result.get("sub_flow_id") if source_result else None
)
if node_id:
result["current_node_id"] = node_id
options = next_node.get("options", [])
session = await chat_repo.update_session_state(
db,
session_id=session.id,
state_updates={"system": {"_current_options": options}},
current_node_id=node_id,
current_flow_id=flow_id,
expected_revision=session.revision,
)
result["input_request"] = self._build_input_request(next_node)
result["awaiting_input"] = True
break
else:
break

# If no next node after processing, check for another parent level
if not next_node and not result.get("awaiting_input"):
result["session_ended"] = True
# Recursively check for more parent flows
if flow_stack:
# Re-read session flow_stack: the return node may have been a
# composite that pushed new entries (e.g., recommendation sub-flow
# ran entirely within CompositeNodeProcessor)
session = await self._refresh_session(db, session)
current_stack = session.info.get("flow_stack", [])
if current_stack:
return await self._try_return_to_parent_flow(db, session, result)

return result
Expand Down
Loading
Loading