Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions app/services/action_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- api_call: Internal API calls with authentication
- delete_variable: Remove variables
- aggregate: Aggregate values from a list using various operations
- emit_event: Fire-and-forget analytics event creation
"""

import datetime
Expand Down Expand Up @@ -223,6 +224,11 @@ async def _execute_actions_sync(
action, current_state, variables_updated
)

elif action_type == "emit_event":
await self._handle_emit_event(
action, current_state, variables_updated, context
)

else:
logger.warning(f"Unknown action type: {action_type}")
errors.append(f"Unknown action type: {action_type}")
Expand Down Expand Up @@ -596,3 +602,215 @@ def _set_nested_value(

# Set the final value
current[keys[-1]] = value

# ------------------------------------------------------------------
# emit_event action
# ------------------------------------------------------------------

async def _handle_emit_event(
self,
action: Dict[str, Any],
state: Dict[str, Any],
updates: Dict[str, Any],
context: Dict[str, Any],
) -> None:
"""Handle emit_event action — fire-and-forget analytics event creation.

Errors are logged but never raised so telemetry cannot break the chat flow.
"""
try:
await self._emit_event_inner(action, state, updates, context)
except Exception:
logger.error(
"emit_event failed (fire-and-forget)",
action_title=action.get("title"),
exc_info=True,
)

async def _emit_event_inner(
self,
action: Dict[str, Any],
state: Dict[str, Any],
updates: Dict[str, Any],
context: Dict[str, Any],
) -> None:
"""Inner implementation for emit_event — may raise on DB errors."""
from app.repositories.event_repository import event_repository

db: AsyncSession | None = context.get("db")
if db is None:
logger.warning("emit_event skipped: no db session in context")
return

# Merge state with updates to get current values
current_state = {**state}
self._deep_merge(current_state, updates)

iterate_over = action.get("iterate_over")
if iterate_over:
await self._emit_iterated_events(
action, current_state, context, db, event_repository
)
return

title = self.runtime.substitute_variables(
action.get("title", ""), current_state
)
description = action.get("description")
if description:
description = self.runtime.substitute_variables(description, current_state)

info = self.runtime.substitute_object(action.get("info", {}), current_state)
info = _strip_unresolved_templates(info)

school = await self._resolve_school(db, current_state, context)
service_account = await self._resolve_service_account(db, action, context)

await event_repository.acreate(
session=db,
title=title,
description=description,
info=info,
school=school,
account=service_account,
commit=False,
)
logger.debug("emit_event created", title=title)

async def _emit_iterated_events(
self,
action: Dict[str, Any],
current_state: Dict[str, Any],
context: Dict[str, Any],
db: "AsyncSession",
event_repository: Any,
) -> None:
"""Create one event per item in a list variable."""
iterate_over = action["iterate_over"]
item_alias = action.get("item_alias", "item")
items = self._get_nested_value(current_state, iterate_over)

if not isinstance(items, list):
logger.warning(
"emit_event iterate_over is not a list",
iterate_over=iterate_over,
actual_type=type(items).__name__ if items is not None else "None",
)
return

school = await self._resolve_school(db, current_state, context)
service_account = await self._resolve_service_account(db, action, context)

for item in items:
# Inject item into temp.<alias> scope for template resolution
iter_state = {**current_state}
temp = dict(iter_state.get("temp", {}))
temp[item_alias] = item
iter_state["temp"] = temp

title = self.runtime.substitute_variables(
action.get("title", ""), iter_state
)
description = action.get("description")
if description:
description = self.runtime.substitute_variables(description, iter_state)

info = self.runtime.substitute_object(action.get("info", {}), iter_state)
info = _strip_unresolved_templates(info)

try:
await event_repository.acreate(
session=db,
title=title,
description=description,
info=info,
school=school,
account=service_account,
commit=False,
)
except Exception:
logger.error(
"emit_event iteration failed",
title=title,
exc_info=True,
)

logger.debug(
"emit_event iterated",
title=action.get("title"),
count=len(items),
)

async def _resolve_school(
self,
db: "AsyncSession",
state: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""Resolve School from context.school_wriveted_id, cached on context dict."""
cached = context.get("_resolved_school")
if cached is not None:
return cached

from uuid import UUID

from sqlalchemy import select

from app.models.school import School

school_id_str = self._get_nested_value(state, "context.school_wriveted_id")
if not school_id_str:
return None

try:
school_uuid = UUID(str(school_id_str))
result = await db.execute(
select(School).where(School.wriveted_identifier == school_uuid)
)
school = result.scalars().first()
if school:
context["_resolved_school"] = school
return school
except Exception:
logger.warning(
"Could not resolve school for emit_event",
school_id=school_id_str,
exc_info=True,
)
return None

async def _resolve_service_account(
self,
db: "AsyncSession",
action: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""Resolve ServiceAccount by name, cached on context dict."""
sa_name = action.get("service_account")
if not sa_name:
return None

cache_key = f"_resolved_sa_{sa_name}"
cached = context.get(cache_key)
if cached is not None:
return cached

from sqlalchemy import select

from app.models.service_account import ServiceAccount

try:
result = await db.execute(
select(ServiceAccount).where(ServiceAccount.name == sa_name)
)
sa = result.scalars().first()
if sa:
context[cache_key] = sa
return sa
except Exception:
logger.warning(
"Could not resolve service account for emit_event",
service_account_name=sa_name,
exc_info=True,
)
return None
Loading
Loading