Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

import asyncio
import inspect
import json
from collections.abc import AsyncIterable, Callable
from datetime import datetime, timezone
from typing import Any, cast

import azure.durable_functions as df
Expand All @@ -30,11 +28,10 @@
DurableAgentState,
DurableAgentStateData,
DurableAgentStateEntry,
DurableAgentStateMessage,
DurableAgentStateRequest,
DurableAgentStateResponse,
)
from ._models import AgentResponse, RunRequest
from ._models import RunRequest

logger = get_logger("agent_framework.azurefunctions.entities")

Expand Down Expand Up @@ -97,21 +94,16 @@ async def run_agent(
self,
context: df.DurableEntityContext,
request: RunRequest | dict[str, Any] | str,
) -> dict[str, Any]:
) -> AgentRunResponse:
"""Execute the agent with a message directly in the entity.

Args:
context: Entity context
request: RunRequest object, dict, or string message (for backward compatibility)

Returns:
Dict with status information and response (serialized AgentResponse)

Note:
The agent returns an AgentRunResponse object which is stored in state.
This method extracts the text/structured response and returns an AgentResponse dict.
AgentRunResponse enriched with execution metadata.
"""
# Convert string or dict to RunRequest
if isinstance(request, str):
run_request = RunRequest(message=request, role=Role.USER)
elif isinstance(request, dict):
Expand All @@ -135,8 +127,6 @@ async def run_agent(
logger.debug(f"[AgentEntity.run_agent] Received Message: {state_request}")

try:
logger.debug("[AgentEntity.run_agent] Starting agent invocation")

# Build messages from conversation history, excluding error responses
# Error responses are kept in history for tracking but not sent to the agent
chat_messages: list[ChatMessage] = [
Expand Down Expand Up @@ -164,83 +154,42 @@ async def run_agent(
type(agent_run_response).__name__,
)

response_text = None
structured_response = None
response_str: str | None = None

try:
if response_format:
try:
response_str = agent_run_response.text
structured_response = json.loads(response_str)
logger.debug("Parsed structured JSON response")
except json.JSONDecodeError as decode_error:
logger.warning(f"Failed to parse JSON response: {decode_error}")
response_text = response_str
else:
raw_text = agent_run_response.text
response_text = raw_text if raw_text else "No response"
preview = response_text
logger.debug(f"Response: {preview[:100]}..." if len(preview) > 100 else f"Response: {preview}")
response_text = agent_run_response.text if agent_run_response.text else "No response"
logger.debug(f"Response: {response_text[:100]}...")
except Exception as extraction_error:
logger.error(
f"Error extracting response: {extraction_error}",
"Error extracting response text: %s",
extraction_error,
exc_info=True,
)
response_text = "Error extracting response"

state_response = DurableAgentStateResponse.from_run_response(correlation_id, agent_run_response)
self.state.data.conversation_history.append(state_response)

agent_response = AgentResponse(
response=response_text,
message=str(message),
thread_id=str(thread_id),
status="success",
message_count=len(self.state.data.conversation_history),
structured_response=structured_response,
)
result = agent_response.to_dict()

logger.debug("[AgentEntity.run_agent] AgentRunResponse stored in conversation history")

return result
return agent_run_response

except Exception as exc:
import traceback

error_traceback = traceback.format_exc()
logger.error("[AgentEntity.run_agent] Agent execution failed")
logger.error(f"Error: {exc!s}")
logger.error(f"Error type: {type(exc).__name__}")
logger.error(f"Full traceback:\n{error_traceback}")
logger.error(f"[AgentEntity.run_agent] Agent execution failed. Full traceback:\n{error_traceback}")

# Create error message
error_message = DurableAgentStateMessage.from_chat_message(
ChatMessage(
role=Role.ASSISTANT, contents=[ErrorContent(message=str(exc), error_code=type(exc).__name__)]
)
error_message = ChatMessage(
role=Role.ASSISTANT, contents=[ErrorContent(message=str(exc), error_code=type(exc).__name__)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're creating a custom error response from the entity when something unexpected happens, but I don't believe we're doing this in .NET. I'm also not sure if this is the right thing for us to do since it hides the fact that an error occurred from Functions telemetry. Regardless of what design we think is best, we should do the same thing in both languages. Can you open an issue to review the implementation and track consistent error handling for agent invocations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the experiences should be consistent.
Created an issue to track this separately - #2434

)

error_response = AgentRunResponse(messages=[error_message])

# Create and store error response in conversation history
error_state_response = DurableAgentStateResponse(
correlation_id=correlation_id,
created_at=datetime.now(tz=timezone.utc),
messages=[error_message],
is_error=True,
)
error_state_response = DurableAgentStateResponse.from_run_response(correlation_id, error_response)
error_state_response.is_error = True
self.state.data.conversation_history.append(error_state_response)

error_response = AgentResponse(
response=f"Error: {exc!s}",
message=str(message),
thread_id=str(thread_id),
status="error",
message_count=len(self.state.data.conversation_history),
error=str(exc),
error_type=type(exc).__name__,
)
return error_response.to_dict()
return error_response

async def _invoke_agent(
self,
Expand Down Expand Up @@ -432,7 +381,7 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None:
request = "" if input_data is None else str(cast(object, input_data))

result = await entity.run_agent(context, request)
context.set_result(result)
context.set_result(result.to_dict())

elif operation == "reset":
entity.reset(context)
Expand All @@ -442,8 +391,9 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None:
logger.error("[entity_function] Unknown operation: %s", operation)
context.set_result({"error": f"Unknown operation: {operation}"})

logger.debug("State dict: %s", entity.state.to_dict())
context.set_state(entity.state.to_dict())
serialized_state = entity.state.to_dict()
logger.debug("State dict: %s", serialized_state)
context.set_state(serialized_state)
logger.info(f"[entity_function] Operation {operation} completed successfully")

except Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,50 +362,3 @@ def from_dict(cls, data: dict[str, Any]) -> RunRequest:
correlation_id=data.get("correlationId"),
created_at=data.get("created_at"),
)


@dataclass
class AgentResponse:
"""Response from agent execution.

Attributes:
response: The agent's text response (or None for structured responses)
message: The original message sent to the agent
thread_id: The thread identifier
status: Status of the execution (success, error, etc.)
message_count: Number of messages in the conversation
error: Error message if status is error
error_type: Type of error if status is error
structured_response: Structured response if response_format was provided
"""

response: str | None
message: str
thread_id: str | None
status: str
message_count: int = 0
error: str | None = None
error_type: str | None = None
structured_response: dict[str, Any] | None = None

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
result: dict[str, Any] = {
"message": self.message,
"thread_id": self.thread_id,
"status": self.status,
"message_count": self.message_count,
}

# Add response or structured_response based on what's available
if self.structured_response is not None:
result["structured_response"] = self.structured_response
elif self.response is not None:
result["response"] = self.response

if self.error:
result["error"] = self.error
if self.error_type:
result["error_type"] = self.error_type

return result
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,27 @@
"""

import uuid
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Generator
from typing import TYPE_CHECKING, Any, TypeAlias, cast

from agent_framework import AgentProtocol, AgentRunResponseUpdate, AgentThread, ChatMessage, get_logger
from agent_framework import (
AgentProtocol,
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
ChatMessage,
get_logger,
)
from pydantic import BaseModel

from ._models import AgentSessionId, DurableAgentThread, RunRequest

logger = get_logger("agent_framework.azurefunctions.orchestration")

if TYPE_CHECKING:
from azure.durable_functions import DurableOrchestrationContext as _DurableOrchestrationContext
from azure.durable_functions import DurableOrchestrationContext

AgentOrchestrationContextType: TypeAlias = _DurableOrchestrationContext
AgentOrchestrationContextType: TypeAlias = DurableOrchestrationContext
else:
AgentOrchestrationContextType = Any

Expand Down Expand Up @@ -81,13 +89,18 @@ def description(self) -> str | None:
"""Get the description of the agent."""
return self._description

def run(
# We return a Generator[Any, Any, ...] here because Durable Functions orchestrations
# require yielding Tasks, and the run method must return a Task that can be yielded.
# This is an intentional deviation from AgentProtocol which defines run() as async.
# The Generator type is correct for Durable Functions orchestrations.
def run( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
response_format: type[BaseModel] | None = None,
**kwargs: Any,
) -> Any: # TODO(msft-team): Add a wrapper to respond correctly with `AgentRunResponse`
) -> Generator[Any, Any, AgentRunResponse]:
"""Execute the agent with messages and return a Task for orchestrations.

This method implements AgentProtocol and returns a Task that can be yielded
Expand All @@ -96,23 +109,24 @@ def run(
Args:
messages: The message(s) to send to the agent
thread: Optional agent thread for conversation context
**kwargs: Additional arguments (enable_tool_calls, response_format, etc.)
response_format: Optional Pydantic model for response parsing
**kwargs: Additional arguments (enable_tool_calls)

Returns:
Task that will resolve to the agent response
Yield a task that will resolve to the agent response

Example:
@app.orchestration_trigger(context_name="context")
def my_orchestration(context):
agent = app.get_agent(context, "MyAgent")
thread = agent.get_new_thread()
result = yield agent.run("Hello", thread=thread)
result = yield from agent.run("Hello", thread=thread)
"""
message_str = self._normalize_messages(messages)
logger.debug(f"[DurableAIAgent] Running agent '{self.agent_name}' with message: {message_str[:100]}...")

# Extract optional parameters from kwargs
enable_tool_calls = kwargs.get("enable_tool_calls", True)
response_format = kwargs.get("response_format")

# Get the session ID for the entity
if isinstance(thread, DurableAgentThread) and thread.session_id is not None:
Expand All @@ -130,6 +144,12 @@ def my_orchestration(context):
# Generate a deterministic correlation ID for this call
# This is required by the entity and must be unique per call
correlation_id = str(self.context.new_uuid())
logger.debug(
"[DurableAIAgent] Using correlation_id: %s for entity_id: %s for session_id: %s",
correlation_id,
entity_id,
session_id,
)

# Prepare the request using RunRequest model
run_request = RunRequest(
Expand All @@ -144,7 +164,57 @@ def my_orchestration(context):

# Call the entity and return the Task directly
# The orchestration will yield this Task
return self.context.call_entity(entity_id, "run_agent", run_request.to_dict())
result = yield self.context.call_entity(entity_id, "run_agent", run_request.to_dict())

logger.debug(
"[DurableAIAgent] Entity call completed for correlation_id %s",
correlation_id,
)

response = self._load_agent_response(result)

if response_format is not None:
self._ensure_response_format(response_format, correlation_id, response)

return response

def _load_agent_response(self, agent_response: AgentRunResponse | dict[str, Any] | None) -> AgentRunResponse:
"""Convert raw payloads into AgentRunResponse instance."""
if agent_response is None:
raise ValueError("agent_response cannot be None")

logger.debug(f"[load_agent_response] Loading agent response of type: {type(agent_response)}")

if isinstance(agent_response, AgentRunResponse):
return agent_response
if isinstance(agent_response, dict):
logger.debug("[load_agent_response] Converting dict payload using AgentRunResponse.from_dict")
return AgentRunResponse.from_dict(agent_response)

raise TypeError(f"Unsupported type for agent_response: {type(agent_response)}")

def _ensure_response_format(
self,
response_format: type[BaseModel] | None,
correlation_id: str,
response: AgentRunResponse,
) -> None:
"""Ensure the AgentRunResponse value is parsed into the expected response_format."""
if response_format is not None and not isinstance(response.value, response_format):
logger.debug(
"[DurableAIAgent] Response value type %s does not match expected %s for correlation_id %s",
type(response.value),
response_format,
correlation_id,
)

response.try_parse_value(response_format)

logger.debug(
"[DurableAIAgent] Loaded AgentRunResponse.value for correlation_id %s with type: %s",
correlation_id,
type(response.value),
)

def run_stream(
self,
Expand Down
Loading
Loading