-
Notifications
You must be signed in to change notification settings - Fork 802
Python: [Breaking] Python: Respond with AgentRunResponse with serialized structured output
#2285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 10 commits
53b6cc0
262c6f5
92260d7
47ef15f
51b710d
8d645c9
01495e3
f3bfc8e
cd07aab
50dcf68
cc504c5
22cd334
2a0ca25
804b4bb
62f1b51
62a79de
fbff509
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,7 @@ | |
|
|
||
| import asyncio | ||
| import inspect | ||
| import json | ||
| from collections.abc import AsyncIterable, Callable | ||
| from datetime import datetime, timezone | ||
| from typing import Any, cast | ||
|
|
||
| import azure.durable_functions as df | ||
|
|
@@ -30,11 +28,10 @@ | |
| DurableAgentState, | ||
| DurableAgentStateData, | ||
| DurableAgentStateEntry, | ||
| DurableAgentStateMessage, | ||
| DurableAgentStateRequest, | ||
| DurableAgentStateResponse, | ||
| ) | ||
| from ._models import AgentResponse, RunRequest | ||
| from ._models import RunRequest | ||
|
|
||
| logger = get_logger("agent_framework.azurefunctions.entities") | ||
|
|
||
|
|
@@ -97,21 +94,16 @@ async def run_agent( | |
| self, | ||
| context: df.DurableEntityContext, | ||
| request: RunRequest | dict[str, Any] | str, | ||
| ) -> dict[str, Any]: | ||
| ) -> AgentRunResponse: | ||
| """Execute the agent with a message directly in the entity. | ||
|
|
||
| Args: | ||
| context: Entity context | ||
| request: RunRequest object, dict, or string message (for backward compatibility) | ||
|
|
||
| Returns: | ||
| Dict with status information and response (serialized AgentResponse) | ||
|
|
||
| Note: | ||
| The agent returns an AgentRunResponse object which is stored in state. | ||
| This method extracts the text/structured response and returns an AgentResponse dict. | ||
| AgentRunResponse enriched with execution metadata. | ||
| """ | ||
| # Convert string or dict to RunRequest | ||
| if isinstance(request, str): | ||
| run_request = RunRequest(message=request, role=Role.USER) | ||
| elif isinstance(request, dict): | ||
|
|
@@ -135,8 +127,6 @@ async def run_agent( | |
| logger.debug(f"[AgentEntity.run_agent] Received Message: {state_request}") | ||
|
|
||
| try: | ||
| logger.debug("[AgentEntity.run_agent] Starting agent invocation") | ||
|
|
||
| # Build messages from conversation history, excluding error responses | ||
| # Error responses are kept in history for tracking but not sent to the agent | ||
| chat_messages: list[ChatMessage] = [ | ||
|
|
@@ -164,83 +154,42 @@ async def run_agent( | |
| type(agent_run_response).__name__, | ||
| ) | ||
|
|
||
| response_text = None | ||
| structured_response = None | ||
| response_str: str | None = None | ||
|
|
||
| try: | ||
| if response_format: | ||
| try: | ||
| response_str = agent_run_response.text | ||
| structured_response = json.loads(response_str) | ||
| logger.debug("Parsed structured JSON response") | ||
| except json.JSONDecodeError as decode_error: | ||
| logger.warning(f"Failed to parse JSON response: {decode_error}") | ||
| response_text = response_str | ||
| else: | ||
| raw_text = agent_run_response.text | ||
| response_text = raw_text if raw_text else "No response" | ||
| preview = response_text | ||
| logger.debug(f"Response: {preview[:100]}..." if len(preview) > 100 else f"Response: {preview}") | ||
| response_text = agent_run_response.text if agent_run_response.text else "No response" | ||
| logger.debug(f"Response: {response_text[:100]}...") | ||
| except Exception as extraction_error: | ||
| logger.error( | ||
| f"Error extracting response: {extraction_error}", | ||
| "Error extracting response text: %s", | ||
| extraction_error, | ||
| exc_info=True, | ||
| ) | ||
| response_text = "Error extracting response" | ||
|
|
||
| state_response = DurableAgentStateResponse.from_run_response(correlation_id, agent_run_response) | ||
| self.state.data.conversation_history.append(state_response) | ||
|
|
||
| agent_response = AgentResponse( | ||
| response=response_text, | ||
| message=str(message), | ||
| thread_id=str(thread_id), | ||
| status="success", | ||
| message_count=len(self.state.data.conversation_history), | ||
| structured_response=structured_response, | ||
| ) | ||
| result = agent_response.to_dict() | ||
|
|
||
| logger.debug("[AgentEntity.run_agent] AgentRunResponse stored in conversation history") | ||
|
|
||
| return result | ||
| return agent_run_response | ||
|
|
||
| except Exception as exc: | ||
| import traceback | ||
|
|
||
| error_traceback = traceback.format_exc() | ||
| logger.error("[AgentEntity.run_agent] Agent execution failed") | ||
| logger.error(f"Error: {exc!s}") | ||
| logger.error(f"Error type: {type(exc).__name__}") | ||
| logger.error(f"Full traceback:\n{error_traceback}") | ||
| logger.error(f"[AgentEntity.run_agent] Agent execution failed. Full traceback:\n{error_traceback}") | ||
|
|
||
| # Create error message | ||
| error_message = DurableAgentStateMessage.from_chat_message( | ||
| ChatMessage( | ||
| role=Role.ASSISTANT, contents=[ErrorContent(message=str(exc), error_code=type(exc).__name__)] | ||
| ) | ||
| error_message = ChatMessage( | ||
| role=Role.ASSISTANT, contents=[ErrorContent(message=str(exc), error_code=type(exc).__name__)] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you're creating a custom error response from the entity when something unexpected happens, but I don't believe we're doing this in .NET. I'm also not sure if this is the right thing for us to do since it hides the fact that an error occurred from Functions telemetry. Regardless of what design we think is best, we should do the same thing in both languages. Can you open an issue to review the implementation and track consistent error handling for agent invocations?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, the experiences should be consistent. |
||
| ) | ||
|
|
||
| error_response = AgentRunResponse(messages=[error_message]) | ||
|
|
||
| # Create and store error response in conversation history | ||
| error_state_response = DurableAgentStateResponse( | ||
| correlation_id=correlation_id, | ||
| created_at=datetime.now(tz=timezone.utc), | ||
| messages=[error_message], | ||
| is_error=True, | ||
| ) | ||
| error_state_response = DurableAgentStateResponse.from_run_response(correlation_id, error_response) | ||
| error_state_response.is_error = True | ||
| self.state.data.conversation_history.append(error_state_response) | ||
|
|
||
| error_response = AgentResponse( | ||
| response=f"Error: {exc!s}", | ||
| message=str(message), | ||
| thread_id=str(thread_id), | ||
| status="error", | ||
| message_count=len(self.state.data.conversation_history), | ||
| error=str(exc), | ||
| error_type=type(exc).__name__, | ||
| ) | ||
| return error_response.to_dict() | ||
| return error_response | ||
|
|
||
| async def _invoke_agent( | ||
| self, | ||
|
|
@@ -432,7 +381,7 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None: | |
| request = "" if input_data is None else str(cast(object, input_data)) | ||
|
|
||
| result = await entity.run_agent(context, request) | ||
| context.set_result(result) | ||
| context.set_result(result.to_dict()) | ||
|
|
||
| elif operation == "reset": | ||
| entity.reset(context) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.