Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 94 additions & 89 deletions docs/DeploymentGuide.md

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions src/backend/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Execute backend API Service

```shell
uv run uvicorn app_kernel:app --port 8000
```
uv run uvicorn app:app --port 8000
```
264 changes: 105 additions & 159 deletions src/backend/af/callbacks/response_handlers.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
"""
Agent Framework response callbacks for employee onboarding / multi-agent system.
Replaces Semantic Kernel message types with agent_framework ChatResponseUpdate handling.
Enhanced response callbacks (agent_framework version) for employee onboarding agent system.
"""

import asyncio
import json
import logging
import re
import time
from typing import Optional

from agent_framework import (
ChatResponseUpdate,
FunctionCallContent,
UsageContent,
Role,
TextContent,
)
import re
from typing import Any

from agent_framework import ChatMessage
# Removed: from agent_framework._content import FunctionCallContent (does not exist)

from agent_framework._workflows._magentic import AgentRunResponseUpdate # Streaming update type from workflows

from af.config.settings import connection_config
from af.models.messages import (
Expand All @@ -30,177 +25,128 @@
logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Utility
# ---------------------------------------------------------------------------

_CITATION_PATTERNS = [
(r"\[\d+:\d+\|source\]", ""), # [9:0|source]
(r"\[\s*source\s*\]", ""), # [source]
(r"\[\d+\]", ""), # [12]
(r"【[^】]*】", ""), # Unicode bracket citations
(r"\(source:[^)]*\)", ""), # (source: xyz)
(r"\[source:[^\]]*\]", ""), # [source: xyz]
]


def clean_citations(text: str) -> str:
"""Remove citation markers from agent responses while preserving formatting."""
if not text:
return text
for pattern, repl in _CITATION_PATTERNS:
text = re.sub(pattern, repl, text, flags=re.IGNORECASE)
text = re.sub(r'\[\d+:\d+\|source\]', '', text)
text = re.sub(r'\[\s*source\s*\]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\[\d+\]', '', text)
text = re.sub(r'【[^】]*】', '', text)
text = re.sub(r'\(source:[^)]*\)', '', text, flags=re.IGNORECASE)
text = re.sub(r'\[source:[^\]]*\]', '', text, flags=re.IGNORECASE)
return text


def _parse_function_arguments(arg_value: Optional[str | dict]) -> dict:
"""Best-effort parse for function call arguments (stringified JSON or dict)."""
if arg_value is None:
return {}
if isinstance(arg_value, dict):
return arg_value
if isinstance(arg_value, str):
try:
return json.loads(arg_value)
except Exception: # noqa: BLE001
return {"raw": arg_value}
return {"raw": str(arg_value)}

def _is_function_call_item(item: Any) -> bool:
"""Heuristic to detect a function/tool call item without relying on SK class types."""
if item is None:
return False
# Common SK attributes: content_type == "function_call"
if getattr(item, "content_type", None) == "function_call":
return True
# Agent framework may surface something with name & arguments but no text
if hasattr(item, "name") and hasattr(item, "arguments") and not hasattr(item, "text"):
return True
return False


def _extract_tool_calls_from_contents(contents: list[Any]) -> list[AgentToolCall]:
"""Convert function/tool call-like items into AgentToolCall objects via duck typing."""
tool_calls: list[AgentToolCall] = []
for item in contents:
if _is_function_call_item(item):
tool_calls.append(
AgentToolCall(
tool_name=getattr(item, "name", "unknown_tool"),
arguments=getattr(item, "arguments", {}) or {},
)
)
return tool_calls

# ---------------------------------------------------------------------------
# Core handlers
# ---------------------------------------------------------------------------

def agent_framework_update_callback(
update: ChatResponseUpdate,
user_id: Optional[str] = None,
def agent_response_callback(
agent_id: str,
message: ChatMessage,
user_id: str | None = None,
) -> None:
"""
Handle a non-streaming perspective of updates (tool calls, intermediate steps, final usage).
This can be called for each ChatResponseUpdate; it will route tool calls and standard text
messages to WebSocket.
Final (non-streaming) agent response callback using agent_framework ChatMessage.
"""
agent_name = getattr(update, "model_id", None) or "Agent"
# Use Role or fallback
role = getattr(update, "role", Role.ASSISTANT)
agent_name = getattr(message, "author_name", None) or agent_id or "Unknown Agent"
role = getattr(message, "role", "assistant")
text = clean_citations(getattr(message, "text", "") or "")

# Detect tool/function calls
function_call_contents = [
c for c in (update.contents or [])
if isinstance(c, FunctionCallContent)
]

if user_id is None:
if not user_id:
logger.debug("No user_id provided; skipping websocket send for final message.")
return

try:
if function_call_contents:
# Build tool message
tool_message = AgentToolMessage(agent_name=agent_name)
for fc in function_call_contents:
args = _parse_function_arguments(getattr(fc, "arguments", None))
tool_message.tool_calls.append(
AgentToolCall(
tool_name=getattr(fc, "name", "unknown_tool"),
arguments=args,
)
)
asyncio.create_task(
connection_config.send_status_update_async(
tool_message,
user_id,
message_type=WebsocketMessageType.AGENT_TOOL_MESSAGE,
)
)
logger.info("Function call(s) dispatched: %s", tool_message)
return

# Ignore pure usage or empty updates (handled as final in streaming handler)
if any(isinstance(c, UsageContent) for c in (update.contents or [])):
# We'll treat this as a final token accounting event; no standard message needed.
logger.debug("UsageContent received (final accounting); skipping text dispatch.")
return

# Standard assistant/user message (non-stream delta)
if update.text:
final_message = AgentMessage(
agent_name=agent_name,
timestamp=str(time.time()),
content=clean_citations(update.text),
)
asyncio.create_task(
connection_config.send_status_update_async(
final_message,
user_id,
message_type=WebsocketMessageType.AGENT_MESSAGE,
)
final_message = AgentMessage(
agent_name=agent_name,
timestamp=time.time(),
content=text,
)
asyncio.create_task(
connection_config.send_status_update_async(
final_message,
user_id,
message_type=WebsocketMessageType.AGENT_MESSAGE,
)
logger.info("%s message: %s", role.name.capitalize(), final_message)

)
logger.info("%s message (agent=%s): %s", str(role).capitalize(), agent_name, text[:200])
except Exception as e: # noqa: BLE001
logger.error("agent_framework_update_callback: Error sending WebSocket message: %s", e)
logger.error("agent_response_callback error sending WebSocket message: %s", e)


async def streaming_agent_framework_callback(
update: ChatResponseUpdate,
user_id: Optional[str] = None,
async def streaming_agent_response_callback(
agent_id: str,
update: AgentRunResponseUpdate,
is_final: bool,
user_id: str | None = None,
) -> None:
"""
Handle streaming deltas. For each update with text, forward a streaming message.
Mark is_final=True when a UsageContent is observed (end of run).
Streaming callback for incremental agent output (AgentRunResponseUpdate).
"""
if user_id is None:
if not user_id:
return

try:
# Determine if this update marks the end
is_final = any(isinstance(c, UsageContent) for c in (update.contents or []))

# Streaming text can appear either in update.text or inside TextContent entries.
pieces: list[str] = []
if update.text:
pieces.append(update.text)
# Some events may provide TextContent objects without setting update.text
for c in (update.contents or []):
if isinstance(c, TextContent) and getattr(c, "text", None):
pieces.append(c.text)

if not pieces:
return

streaming_message = AgentMessageStreaming(
agent_name=getattr(update, "model_id", None) or "Agent",
content=clean_citations("".join(pieces)),
is_final=is_final,
)

await connection_config.send_status_update_async(
streaming_message,
user_id,
message_type=WebsocketMessageType.AGENT_MESSAGE_STREAMING,
)

if is_final:
logger.info("Final streaming chunk sent for agent '%s'", streaming_message.agent_name)
chunk_text = getattr(update, "text", None)
if not chunk_text:
contents = getattr(update, "contents", []) or []
collected = []
for item in contents:
txt = getattr(item, "text", None)
if txt:
collected.append(str(txt))
chunk_text = "".join(collected) if collected else ""

cleaned = clean_citations(chunk_text or "")

contents = getattr(update, "contents", []) or []
tool_calls = _extract_tool_calls_from_contents(contents)
if tool_calls:
tool_message = AgentToolMessage(agent_name=agent_id)
tool_message.tool_calls.extend(tool_calls)
await connection_config.send_status_update_async(
tool_message,
user_id,
message_type=WebsocketMessageType.AGENT_TOOL_MESSAGE,
)
logger.info("Tool calls streamed from %s: %d", agent_id, len(tool_calls))

if cleaned:
streaming_payload = AgentMessageStreaming(
agent_name=agent_id,
content=cleaned,
is_final=is_final,
)
await connection_config.send_status_update_async(
streaming_payload,
user_id,
message_type=WebsocketMessageType.AGENT_MESSAGE_STREAMING,
)
logger.debug("Streaming chunk (agent=%s final=%s len=%d)", agent_id, is_final, len(cleaned))
except Exception as e: # noqa: BLE001
logger.error("streaming_agent_framework_callback: Error sending streaming WebSocket message: %s", e)


# ---------------------------------------------------------------------------
# Convenience wrappers (optional)
# ---------------------------------------------------------------------------

def handle_update(update: ChatResponseUpdate, user_id: Optional[str]) -> None:
"""
Unified entry point if caller doesn't distinguish streaming vs non-streaming.
You can call this once per update. It will:
- Forward streaming text increments
- Forward tool calls
- Skip purely usage-only events (except marking final in streaming)
"""
# Send streaming chunk first (async context)
asyncio.create_task(streaming_agent_framework_callback(update, user_id))
# Then send non-stream items (tool calls or discrete messages)
agent_framework_update_callback(update, user_id)

logger.error("streaming_agent_response_callback error: %s", e)
8 changes: 4 additions & 4 deletions src/backend/af/common/services/agents_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
methods to convert a TeamConfiguration into a list/array of agent descriptors.

This is intentionally a simple skeleton — the user will later provide the
implementation that wires these descriptors into Semantic Kernel / Foundry
implementation that wires these descriptors into agent framework / Foundry
agent instances.
"""

import logging
from typing import Any, Dict, List, Union

from common.models.messages_kernel import TeamAgent, TeamConfiguration
from common.models.messages_af import TeamAgent, TeamConfiguration
from af.common.services.team_service import TeamService


Expand All @@ -26,7 +26,7 @@ class AgentsService:
returns a list of agent descriptors. Descriptors are plain dicts that
contain the fields required to later instantiate runtime agents.

The concrete instantiation logic (semantic kernel / foundry) is intentionally
The concrete instantiation logic (agent framework / foundry) is intentionally
left out and should be implemented by the user later (see
`instantiate_agents` placeholder).
"""
Expand Down Expand Up @@ -109,7 +109,7 @@ async def get_agents_from_team_config(
async def instantiate_agents(self, agent_descriptors: List[Dict[str, Any]]):
"""Placeholder for instantiating runtime agent objects from descriptors.

The real implementation should create Semantic Kernel / Foundry agents
The real implementation should create agent framework / Foundry agents
and attach them to each descriptor under the key `agent_obj` or return a
list of instantiated agents.

Expand Down
4 changes: 2 additions & 2 deletions src/backend/af/common/services/plan_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def build_agent_message_from_user_clarification(
"""
Convert a UserClarificationResponse (human feedback) into an AgentMessageData.
"""
# NOTE: AgentMessageType enum currently defines values with trailing commas in messages_kernel.py.
# NOTE: AgentMessageType enum currently defines values with trailing commas in messages_af.py.
# e.g. HUMAN_AGENT = "Human_Agent", -> value becomes ('Human_Agent',)
# Consider fixing that enum (remove trailing commas) so .value is a string.
return AgentMessageData(
Expand All @@ -43,7 +43,7 @@ def build_agent_message_from_agent_message_response(
user_id: str,
) -> AgentMessageData:
"""
Convert a messages.AgentMessageResponse into common.models.messages_kernel.AgentMessageData.
Convert a messages.AgentMessageResponse into common.models.messages_af.AgentMessageData.
This is defensive: it tolerates missing fields and different timestamp formats.
"""
# Robust timestamp parsing (accepts seconds or ms or missing)
Expand Down
Loading
Loading