Skip to content

Commit 1c379b1

Browse files
authored
Fix COMPOSITE node chaining in process_interaction (#595)
* Fix COMPOSITE nodes not processed in interaction chaining loops The two while loops in process_interaction that chain through non-interactive nodes (ACTION, CONDITION, MESSAGE) did not handle COMPOSITE nodes, causing the session to stall when a MESSAGE node chained to a COMPOSITE node (e.g., stale_collection_msg → profile_composite). Add NodeType.COMPOSITE to both while loops and handle dict question results that composite sub-flows return when entering a sub-flow with a question node. * Persist session state before break and update E2E test Add update_session_state call before break in both while loops when a composite returns a question directly, ensuring session position is persisted to DB before exiting. Without this, the next user reply could resolve against the wrong node/flow. Update E2E test script to walk through composite sub-flows (recommendation, jokes, spelling) and add Test 5 for the stale collection → composite chaining path. * Add emit_event action type for flow-driven analytics events Implement a generic emit_event action in ActionNodeProcessor that creates events directly from flow configurations, matching the 12 event types produced by the production Landbot chatbot. - Fire-and-forget: errors logged but never break the chat flow - Template substitution: {{var}} resolution in title, description, info - iterate_over: create per-item events (e.g. one per book reviewed) - School/ServiceAccount resolution cached per action execution - Seed huey-events service account in admin seed script - Add emit_event nodes to all 4 Huey flow fixtures - 6 new unit tests, E2E event verification * Extract _process_chained_node helper to deduplicate interaction chaining loops Both QUESTION and MESSAGE response branches in process_interaction contained nearly identical ~140-line chaining loops. Extract shared logic into a single _process_chained_node method that handles QUESTION, ACTION/CONDITION/COMPOSITE, and MESSAGE node types uniformly. Also document the emit_event action type across chatflow-node-types, node-schemas-reference, and architecture docs.
1 parent 533d9b2 commit 1c379b1

13 files changed

+1586
-182
lines changed

app/services/action_processor.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- api_call: Internal API calls with authentication
88
- delete_variable: Remove variables
99
- aggregate: Aggregate values from a list using various operations
10+
- emit_event: Fire-and-forget analytics event creation
1011
"""
1112

1213
import datetime
@@ -223,6 +224,11 @@ async def _execute_actions_sync(
223224
action, current_state, variables_updated
224225
)
225226

227+
elif action_type == "emit_event":
228+
await self._handle_emit_event(
229+
action, current_state, variables_updated, context
230+
)
231+
226232
else:
227233
logger.warning(f"Unknown action type: {action_type}")
228234
errors.append(f"Unknown action type: {action_type}")
@@ -596,3 +602,215 @@ def _set_nested_value(
596602

597603
# Set the final value
598604
current[keys[-1]] = value
605+
606+
# ------------------------------------------------------------------
607+
# emit_event action
608+
# ------------------------------------------------------------------
609+
610+
async def _handle_emit_event(
611+
self,
612+
action: Dict[str, Any],
613+
state: Dict[str, Any],
614+
updates: Dict[str, Any],
615+
context: Dict[str, Any],
616+
) -> None:
617+
"""Handle emit_event action — fire-and-forget analytics event creation.
618+
619+
Errors are logged but never raised so telemetry cannot break the chat flow.
620+
"""
621+
try:
622+
await self._emit_event_inner(action, state, updates, context)
623+
except Exception:
624+
logger.error(
625+
"emit_event failed (fire-and-forget)",
626+
action_title=action.get("title"),
627+
exc_info=True,
628+
)
629+
630+
async def _emit_event_inner(
631+
self,
632+
action: Dict[str, Any],
633+
state: Dict[str, Any],
634+
updates: Dict[str, Any],
635+
context: Dict[str, Any],
636+
) -> None:
637+
"""Inner implementation for emit_event — may raise on DB errors."""
638+
from app.repositories.event_repository import event_repository
639+
640+
db: AsyncSession | None = context.get("db")
641+
if db is None:
642+
logger.warning("emit_event skipped: no db session in context")
643+
return
644+
645+
# Merge state with updates to get current values
646+
current_state = {**state}
647+
self._deep_merge(current_state, updates)
648+
649+
iterate_over = action.get("iterate_over")
650+
if iterate_over:
651+
await self._emit_iterated_events(
652+
action, current_state, context, db, event_repository
653+
)
654+
return
655+
656+
title = self.runtime.substitute_variables(
657+
action.get("title", ""), current_state
658+
)
659+
description = action.get("description")
660+
if description:
661+
description = self.runtime.substitute_variables(description, current_state)
662+
663+
info = self.runtime.substitute_object(action.get("info", {}), current_state)
664+
info = _strip_unresolved_templates(info)
665+
666+
school = await self._resolve_school(db, current_state, context)
667+
service_account = await self._resolve_service_account(db, action, context)
668+
669+
await event_repository.acreate(
670+
session=db,
671+
title=title,
672+
description=description,
673+
info=info,
674+
school=school,
675+
account=service_account,
676+
commit=False,
677+
)
678+
logger.debug("emit_event created", title=title)
679+
680+
async def _emit_iterated_events(
681+
self,
682+
action: Dict[str, Any],
683+
current_state: Dict[str, Any],
684+
context: Dict[str, Any],
685+
db: "AsyncSession",
686+
event_repository: Any,
687+
) -> None:
688+
"""Create one event per item in a list variable."""
689+
iterate_over = action["iterate_over"]
690+
item_alias = action.get("item_alias", "item")
691+
items = self._get_nested_value(current_state, iterate_over)
692+
693+
if not isinstance(items, list):
694+
logger.warning(
695+
"emit_event iterate_over is not a list",
696+
iterate_over=iterate_over,
697+
actual_type=type(items).__name__ if items is not None else "None",
698+
)
699+
return
700+
701+
school = await self._resolve_school(db, current_state, context)
702+
service_account = await self._resolve_service_account(db, action, context)
703+
704+
for item in items:
705+
# Inject item into temp.<alias> scope for template resolution
706+
iter_state = {**current_state}
707+
temp = dict(iter_state.get("temp", {}))
708+
temp[item_alias] = item
709+
iter_state["temp"] = temp
710+
711+
title = self.runtime.substitute_variables(
712+
action.get("title", ""), iter_state
713+
)
714+
description = action.get("description")
715+
if description:
716+
description = self.runtime.substitute_variables(description, iter_state)
717+
718+
info = self.runtime.substitute_object(action.get("info", {}), iter_state)
719+
info = _strip_unresolved_templates(info)
720+
721+
try:
722+
await event_repository.acreate(
723+
session=db,
724+
title=title,
725+
description=description,
726+
info=info,
727+
school=school,
728+
account=service_account,
729+
commit=False,
730+
)
731+
except Exception:
732+
logger.error(
733+
"emit_event iteration failed",
734+
title=title,
735+
exc_info=True,
736+
)
737+
738+
logger.debug(
739+
"emit_event iterated",
740+
title=action.get("title"),
741+
count=len(items),
742+
)
743+
744+
async def _resolve_school(
745+
self,
746+
db: "AsyncSession",
747+
state: Dict[str, Any],
748+
context: Dict[str, Any],
749+
) -> Any:
750+
"""Resolve School from context.school_wriveted_id, cached on context dict."""
751+
cached = context.get("_resolved_school")
752+
if cached is not None:
753+
return cached
754+
755+
from uuid import UUID
756+
757+
from sqlalchemy import select
758+
759+
from app.models.school import School
760+
761+
school_id_str = self._get_nested_value(state, "context.school_wriveted_id")
762+
if not school_id_str:
763+
return None
764+
765+
try:
766+
school_uuid = UUID(str(school_id_str))
767+
result = await db.execute(
768+
select(School).where(School.wriveted_identifier == school_uuid)
769+
)
770+
school = result.scalars().first()
771+
if school:
772+
context["_resolved_school"] = school
773+
return school
774+
except Exception:
775+
logger.warning(
776+
"Could not resolve school for emit_event",
777+
school_id=school_id_str,
778+
exc_info=True,
779+
)
780+
return None
781+
782+
async def _resolve_service_account(
783+
self,
784+
db: "AsyncSession",
785+
action: Dict[str, Any],
786+
context: Dict[str, Any],
787+
) -> Any:
788+
"""Resolve ServiceAccount by name, cached on context dict."""
789+
sa_name = action.get("service_account")
790+
if not sa_name:
791+
return None
792+
793+
cache_key = f"_resolved_sa_{sa_name}"
794+
cached = context.get(cache_key)
795+
if cached is not None:
796+
return cached
797+
798+
from sqlalchemy import select
799+
800+
from app.models.service_account import ServiceAccount
801+
802+
try:
803+
result = await db.execute(
804+
select(ServiceAccount).where(ServiceAccount.name == sa_name)
805+
)
806+
sa = result.scalars().first()
807+
if sa:
808+
context[cache_key] = sa
809+
return sa
810+
except Exception:
811+
logger.warning(
812+
"Could not resolve service account for emit_event",
813+
service_account_name=sa_name,
814+
exc_info=True,
815+
)
816+
return None

0 commit comments

Comments
 (0)