diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py index 233436ac84ea..e24558d6def9 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/agent_framework.py @@ -8,8 +8,9 @@ from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Optional, Protocol, Union, List import inspect -from agent_framework import AgentProtocol, AIFunction +from agent_framework import AgentProtocol, AIFunction, InMemoryCheckpointStorage from agent_framework.azure import AzureAIClient # pylint: disable=no-name-in-module +from agent_framework._workflows import get_checkpoint_summary from opentelemetry import trace from azure.ai.agentserver.core.client.tools import OAuthConsentRequiredError @@ -27,6 +28,7 @@ AgentFrameworkOutputNonStreamingConverter, ) from .models.agent_framework_output_streaming_converter import AgentFrameworkOutputStreamingConverter +from .models.human_in_the_loop_helper import HumanInTheLoopHelper from .models.constants import Constants from .tool_client import ToolClient @@ -85,6 +87,9 @@ def __init__(self, agent: Union[AgentProtocol, AgentFactory], super().__init__(credentials=credentials, **kwargs) # pylint: disable=unexpected-keyword-arg self._agent_or_factory: Union[AgentProtocol, AgentFactory] = agent self._resolved_agent: "Optional[AgentProtocol]" = None + self._hitl_helper = HumanInTheLoopHelper() + self._checkpoint_storage = InMemoryCheckpointStorage() + self._agent_thread_in_memory = {} # If agent is already instantiated, use it directly if isinstance(agent, AgentProtocol): self._resolved_agent = agent @@ -187,9 +192,13 @@ def init_tracing(self): self.tracer = trace.get_tracer(__name__) def setup_tracing_with_azure_ai_client(self, project_endpoint: str): + logger.info("Setting up tracing with AzureAIClient") + logger.info(f"Project endpoint for tracing credential: {self.credentials}") async def setup_async(): async with AzureAIClient( - project_endpoint=project_endpoint, async_credential=self.credentials + project_endpoint=project_endpoint, + async_credential=self.credentials, + credential=self.credentials, ) as agent_client: await agent_client.setup_azure_ai_observability() @@ -223,24 +232,47 @@ async def agent_run( # pylint: disable=too-many-statements logger.info(f"Starting agent_run with stream={context.stream}") request_input = context.request.get("input") - - input_converter = AgentFrameworkInputConverter() - message = input_converter.transform_input(request_input) + # TODO: load agent thread from storage and deserialize + agent_thread = self._agent_thread_in_memory.get(context.conversation_id, agent.get_new_thread()) + + last_checkpoint = None + if self._checkpoint_storage: + checkpoints = await self._checkpoint_storage.list_checkpoints() + last_checkpoint = checkpoints[-1] if len(checkpoints) > 0 else None + logger.info(f"Last checkpoint data: {last_checkpoint.to_dict() if last_checkpoint else 'None'}") + if last_checkpoint: + summary = get_checkpoint_summary(last_checkpoint) + logger.info(f"Last checkpoint summary status: {summary.status}") + if summary.status == "completed": + last_checkpoint = None # Do not resume from completed checkpoints + + input_converter = AgentFrameworkInputConverter(hitl_helper=self._hitl_helper) + message = await input_converter.transform_input( + request_input, + agent_thread=agent_thread, + checkpoint=last_checkpoint) logger.debug(f"Transformed input message type: {type(message)}") # Use split converters if context.stream: logger.info("Running agent in streaming mode") - streaming_converter = AgentFrameworkOutputStreamingConverter(context) + streaming_converter = AgentFrameworkOutputStreamingConverter(context, hitl_helper=self._hitl_helper) async def stream_updates(): try: update_count = 0 - updates = agent.run_stream(message) + updates = agent.run_stream( + message, + thread=agent_thread, + checkpoint_storage=self._checkpoint_storage, + checkpoint_id=last_checkpoint.checkpoint_id if last_checkpoint else None, + ) async for event in streaming_converter.convert(updates): update_count += 1 yield event - + + if agent_thread: + self._agent_thread_in_memory[context.conversation_id] = agent_thread logger.info("Streaming completed with %d updates", update_count) finally: # Close tool_client if it was created for this request @@ -255,9 +287,15 @@ async def stream_updates(): # Non-streaming path logger.info("Running agent in non-streaming mode") - non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context) - result = await agent.run(message) - logger.debug(f"Agent run completed, result type: {type(result)}") + non_streaming_converter = AgentFrameworkOutputNonStreamingConverter(context, hitl_helper=self._hitl_helper) + result = await agent.run(message, + thread=agent_thread, + checkpoint_storage=self._checkpoint_storage, + checkpoint_id=last_checkpoint.checkpoint_id if last_checkpoint else None, + ) + logger.info(f"Agent run completed, result type: {type(result)}") + if agent_thread: + self._agent_thread_in_memory[context.conversation_id] = agent_thread transformed_result = non_streaming_converter.transform_output_for_response(result) logger.info("Agent run and transformation completed successfully") return transformed_result @@ -279,3 +317,4 @@ async def oauth_consent_stream(error=e): logger.debug("Closed tool_client after request processing") except Exception as ex: # pylint: disable=broad-exception-caught logger.warning(f"Error closing tool_client: {ex}") + diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py index 993be43e85c8..28cc76b51c32 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_input_converters.py @@ -5,9 +5,15 @@ # mypy: disable-error-code="no-redef" from __future__ import annotations -from typing import Dict, List - -from agent_framework import ChatMessage, Role as ChatRole +from typing import Dict, List, Optional + +from agent_framework import ( + AgentThread, + ChatMessage, + RequestInfoEvent, + Role as ChatRole, + WorkflowCheckpoint, +) from agent_framework._types import TextContent from azure.ai.agentserver.core.logger import get_logger @@ -21,10 +27,14 @@ class AgentFrameworkInputConverter: Accepts: str | List | None Returns: None | str | ChatMessage | list[str] | list[ChatMessage] """ + def __init__(self, *, hitl_helper=None) -> None: + self._hitl_helper = hitl_helper - def transform_input( + async def transform_input( self, input: str | List[Dict] | None, + agent_thread: Optional[AgentThread] = None, + checkpoint: Optional[WorkflowCheckpoint] = None, ) -> str | ChatMessage | list[str] | list[ChatMessage] | None: logger.debug("Transforming input of type: %s", type(input)) @@ -33,7 +43,28 @@ def transform_input( if isinstance(input, str): return input - + + if self._hitl_helper: + # load pending requests from checkpoint and thread messages if available + thread_messages = [] + if agent_thread: + thread_messages = await agent_thread.message_store.list_messages() + logger.info(f"Thread messages count: {len(thread_messages)}") + pending_hitl_requests = self._hitl_helper.get_pending_hitl_request(thread_messages, checkpoint) + logger.info(f"Pending HitL requests: {list(pending_hitl_requests.keys())}") + hitl_response = self._hitl_helper.validate_and_convert_hitl_response( + input, + pending_requests=pending_hitl_requests) + logger.info(f"HitL response validation result: {[m.to_dict() for m in hitl_response]}") + if hitl_response: + return hitl_response + + return self._transform_input_internal(input) + + def _transform_input_internal( + self, + input: str | List[Dict] | None, + ) -> str | ChatMessage | list[str] | list[ChatMessage] | None: try: if isinstance(input, list): messages: list[str | ChatMessage] = [] @@ -118,3 +149,35 @@ def _extract_input_text(self, content_item: Dict) -> str: if isinstance(text_content, str): return text_content return None # type: ignore + + def _validate_and_convert_hitl_response( + self, + pending_request: Dict, + input: List[Dict], + ) -> Optional[List[ChatMessage]]: + if not self._hitl_helper: + logger.warning("HitL helper not provided; cannot validate HitL response.") + return None + if isinstance(input, str): + logger.warning("Expected list input for HitL response validation, got str.") + return None + if not isinstance(input, list) or len(input) != 1: + logger.warning("Expected single-item list input for HitL response validation.") + return None + + item = input[0] + if item.get("type") != "function_call_output": + logger.warning("Expected function_call_output type for HitL response validation.") + return None + call_id = item.get("call_id", None) + if not call_id or call_id not in pending_request: + logger.warning("Function call output missing valid call_id for HitL response validation.") + return None + request_info = pending_request[call_id] + if isinstance(request_info, dict): + request_info = RequestInfoEvent.from_dict(request_info) + if not isinstance(request_info, RequestInfoEvent): + logger.warning("No valid pending request info found for call_id: %s", call_id) + return None + + return self._hitl_helper.convert_response(request_info, item) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py index fbece993305a..08db24adfae0 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_non_streaming_converter.py @@ -7,7 +7,14 @@ import json from typing import Any, List -from agent_framework import AgentRunResponse, FunctionCallContent, FunctionResultContent, ErrorContent, TextContent +from agent_framework import ( + AgentRunResponse, + FunctionCallContent, + FunctionResultContent, + ErrorContent, + TextContent, +) +from agent_framework._types import UserInputRequestContents from azure.ai.agentserver.core import AgentRunContext from azure.ai.agentserver.core.logger import get_logger @@ -21,6 +28,7 @@ from .agent_id_generator import AgentIdGenerator from .constants import Constants +from .human_in_the_loop_helper import HumanInTheLoopHelper logger = get_logger() @@ -28,10 +36,11 @@ class AgentFrameworkOutputNonStreamingConverter: # pylint: disable=name-too-long """Non-streaming converter: AgentRunResponse -> OpenAIResponse.""" - def __init__(self, context: AgentRunContext): + def __init__(self, context: AgentRunContext, *, hitl_helper: HumanInTheLoopHelper): self._context = context self._response_id = None self._response_created_at = None + self._hitl_helper = hitl_helper def _ensure_response_started(self) -> None: if not self._response_id: @@ -120,6 +129,8 @@ def _append_content_item(self, content: Any, sink: List[dict], author_name: str) self._append_function_call_content(content, sink, author_name) elif isinstance(content, FunctionResultContent): self._append_function_result_content(content, sink, author_name) + elif isinstance(content, UserInputRequestContents): + self._append_user_input_request_contents(content, sink, author_name) elif isinstance(content, ErrorContent): raise ValueError(f"ErrorContent received: code={content.error_code}, message={content.message}") else: @@ -205,6 +216,22 @@ def _append_function_result_content(self, content: FunctionResultContent, sink: call_id, len(result), ) + + def _append_user_input_request_contents(self, content: UserInputRequestContents, sink: List[dict], author_name: str) -> None: + item_id = self._context.id_generator.generate_function_call_id() + content = self._hitl_helper.convert_user_input_request_content(content) + sink.append( + { + "id": item_id, + "type": "function_call", + "status": "in_progress", + "call_id": content["call_id"], + "name": content["name"], + "arguments": content["arguments"], + "created_by": self._build_created_by(author_name), + } + ) + logger.debug(" added user_input_request item id=%s call_id=%s", item_id, content["call_id"]) # ------------- simple normalization helper ------------------------- def _coerce_result_text(self, value: Any) -> str | dict: diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py index 92f1cb983e08..23d8702e38ec 100644 --- a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_framework_output_streaming_converter.py @@ -5,15 +5,17 @@ # mypy: disable-error-code="call-overload,assignment,arg-type,override" from __future__ import annotations +from ast import arguments import datetime import json -from typing import Any, AsyncIterable, List +from typing import Any, AsyncIterable, List, Union from agent_framework import AgentRunResponseUpdate, BaseContent, FunctionApprovalRequestContent, FunctionResultContent from agent_framework._types import ( ErrorContent, FunctionCallContent, TextContent, + UserInputRequestContents, ) from azure.ai.agentserver.core import AgentRunContext @@ -43,6 +45,7 @@ ) from .agent_id_generator import AgentIdGenerator +from .human_in_the_loop_helper import HumanInTheLoopHelper from .utils.async_iter import chunk_on_change, peek @@ -130,46 +133,56 @@ async def convert_contents(self, contents: AsyncIterable[TextContent], author_na class _FunctionCallStreamingState(_BaseStreamingState): """State handler for function_call content during streaming.""" - def __init__(self, parent: AgentFrameworkOutputStreamingConverter): + def __init__(self, + parent: AgentFrameworkOutputStreamingConverter, + hitl_helper: HumanInTheLoopHelper): self._parent = parent + self._hitl_helper = hitl_helper async def convert_contents( - self, contents: AsyncIterable[FunctionCallContent], author_name: str + self, contents: AsyncIterable[Union[FunctionCallContent, UserInputRequestContents]], author_name: str ) -> AsyncIterable[ResponseStreamEvent]: content_by_call_id = {} ids_by_call_id = {} + hitl_contents = [] async for content in contents: - if content.call_id not in content_by_call_id: - item_id = self._parent.context.id_generator.generate_function_call_id() - output_index = self._parent.next_output_index() - - content_by_call_id[content.call_id] = content - ids_by_call_id[content.call_id] = (item_id, output_index) - - yield ResponseOutputItemAddedEvent( - sequence_number=self._parent.next_sequence(), - output_index=output_index, - item=FunctionToolCallItemResource( - id=item_id, - status="in_progress", - call_id=content.call_id, - name=content.name, - arguments="", - created_by=self._parent._build_created_by(author_name), - ), - ) - else: - content_by_call_id[content.call_id] = content_by_call_id[content.call_id] + content - item_id, output_index = ids_by_call_id[content.call_id] - - args_delta = content.arguments if isinstance(content.arguments, str) else "" - yield ResponseFunctionCallArgumentsDeltaEvent( - sequence_number=self._parent.next_sequence(), - item_id=item_id, - output_index=output_index, - delta=args_delta, - ) + if isinstance(content, FunctionCallContent): + if content.call_id not in content_by_call_id: + item_id = self._parent.context.id_generator.generate_function_call_id() + output_index = self._parent.next_output_index() + + content_by_call_id[content.call_id] = content + ids_by_call_id[content.call_id] = (item_id, output_index) + + yield ResponseOutputItemAddedEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=FunctionToolCallItemResource( + id=item_id, + status="in_progress", + call_id=content.call_id, + name=content.name, + arguments="", + created_by=self._parent._build_created_by(author_name), + ), + ) + else: + content_by_call_id[content.call_id] = content_by_call_id[content.call_id] + content + item_id, output_index = ids_by_call_id[content.call_id] + + args_delta = content.arguments if isinstance(content.arguments, str) else "" + yield ResponseFunctionCallArgumentsDeltaEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + delta=args_delta, + ) + + elif isinstance(content, UserInputRequestContents): + converted_hitl = self._hitl_helper.convert_user_input_request_content(content) + if converted_hitl: + hitl_contents.append(converted_hitl) for call_id, content in content_by_call_id.items(): item_id, output_index = ids_by_call_id[call_id] @@ -196,6 +209,51 @@ async def convert_contents( ) self._parent.add_completed_output_item(item) # pylint: disable=protected-access + + # process HITL contents after function calls + for content in hitl_contents: + item_id = self._parent.context.id_generator.generate_function_call_id() + output_index = self._parent.next_output_index() + + yield ResponseOutputItemAddedEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=FunctionToolCallItemResource( + id=item_id, + status="in_progress", + call_id=content["call_id"], + name=content["name"], + arguments="", + created_by=self._parent._build_created_by(author_name), + ), + ) + yield ResponseFunctionCallArgumentsDeltaEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + delta=content["arguments"], + ) + + yield ResponseFunctionCallArgumentsDoneEvent( + sequence_number=self._parent.next_sequence(), + item_id=item_id, + output_index=output_index, + arguments=content["arguments"], + ) + item = FunctionToolCallItemResource( + id=item_id, + status="in_progress", + call_id=content["call_id"], + name=content["name"], + arguments=content["arguments"], + created_by=self._parent._build_created_by(author_name), + ) + yield ResponseOutputItemDoneEvent( + sequence_number=self._parent.next_sequence(), + output_index=output_index, + item=item, + ) + self._parent.add_completed_output_item(item) class _FunctionCallOutputStreamingState(_BaseStreamingState): @@ -255,7 +313,7 @@ def _to_output(cls, result: Any) -> str: class AgentFrameworkOutputStreamingConverter: """Streaming converter using content-type-specific state handlers.""" - def __init__(self, context: AgentRunContext) -> None: + def __init__(self, context: AgentRunContext, *, hitl_helper: HumanInTheLoopHelper=None) -> None: self._context = context # sequence numbers must start at 0 for first emitted event self._sequence = -1 @@ -263,6 +321,7 @@ def __init__(self, context: AgentRunContext) -> None: self._response_id = self._context.response_id self._response_created_at = None self._completed_output_items: List[ItemResource] = [] + self._hitl_helper = hitl_helper def next_sequence(self) -> int: self._sequence += 1 @@ -294,8 +353,12 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async ) is_changed = ( - lambda a, b: a is not None and b is not None and a.message_id != b.message_id # pylint: disable=unnecessary-lambda-assignment + lambda a, b: a is not None \ + and b is not None \ + and (a.message_id != b.message_id \ + or type(a.content[0]) != type(b.content[0])) # pylint: disable=unnecessary-lambda-assignment ) + async for group in chunk_on_change(updates, is_changed): has_value, first_tuple, contents_with_author = await peek(self._read_updates(group)) if not has_value or first_tuple is None: @@ -306,8 +369,8 @@ async def convert(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> Async state = None if isinstance(first, TextContent): state = _TextContentStreamingState(self) - elif isinstance(first, (FunctionCallContent, FunctionApprovalRequestContent)): - state = _FunctionCallStreamingState(self) + elif isinstance(first, (FunctionCallContent, UserInputRequestContents)): + state = _FunctionCallStreamingState(self, self._hitl_helper) elif isinstance(first, FunctionResultContent): state = _FunctionCallOutputStreamingState(self) elif isinstance(first, ErrorContent): @@ -350,7 +413,7 @@ async def _read_updates(self, updates: AsyncIterable[AgentRunResponseUpdate]) -> accepted_types = (TextContent, FunctionCallContent, - FunctionApprovalRequestContent, + UserInputRequestContents, FunctionResultContent, ErrorContent) for content in update.contents: diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py new file mode 100644 index 000000000000..4195f384dca1 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/agent_state_inventory.py @@ -0,0 +1,77 @@ +from typing import Any, Optional + +from agent_framework import AgentThread, BaseAgent + + +class AgentStateInventory: + """Checkpoint inventory to manage saved states of agent threads and workflows.""" + + async def get(self, conversation_id: str) -> Optional[Any]: + """Retrieve the saved state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + """ + pass + + async def set(self, conversation_id: str, state: Any) -> None: + """Save the state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + state (Any): The state to save. + """ + pass + + +class InMemoryThreadAgentStateInventory(AgentStateInventory): + """In-memory implementation of AgentStateInventory.""" + def __init__(self, agent: BaseAgent) -> None: + self._agent = agent + self._inventory: dict[str, AgentThread] = {} + + async def get(self, conversation_id: str) -> Optional[AgentThread]: + """Retrieve the saved state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + """ + if conversation_id in self._inventory: + serialized_thread = self._inventory[conversation_id] + return await self._agent.deserialize_thread(serialized_thread) + return None + + async def set(self, conversation_id: str, state: AgentThread) -> None: + """Save the state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + state (AgentThread): The state to save. + """ + if conversation_id and state: + serialized_thread = await state.serialize() + self._inventory[conversation_id] = serialized_thread + + +class InMemoryCheckpointAgentStateInventory(AgentStateInventory): + """In-memory implementation of AgentStateInventory for workflow checkpoints.""" + def __init__(self) -> None: + self._inventory: dict[str, Any] = {} + + async def get(self, conversation_id: str) -> Optional[Any]: + """Retrieve the saved state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + """ + return self._inventory.get(conversation_id, None) + + async def set(self, conversation_id: str, state: Any) -> None: + """Save the state for a given conversation ID. + + Args: + conversation_id (str): The conversation ID. + state (Any): The state to save. + """ + if conversation_id and state: + self._inventory[conversation_id] = state \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py new file mode 100644 index 000000000000..30bb3aa8d9c5 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/azure/ai/agentserver/agentframework/models/human_in_the_loop_helper.py @@ -0,0 +1,119 @@ +from typing import Any, List, Dict, Optional, Union +import json + +from agent_framework import ( + ChatMessage, + FunctionResultContent, + FunctionApprovalResponseContent, + RequestInfoEvent, + WorkflowCheckpoint, +) +from agent_framework._types import UserInputRequestContents + +from azure.ai.agentserver.core.logger import get_logger +from azure.ai.agentserver.core.server.common.constants import HUMAN_IN_THE_LOOP_FUNCTION_NAME + +logger = get_logger() + +class HumanInTheLoopHelper: + + def get_pending_hitl_request(self, + thread_messages: List[ChatMessage] = None, + checkpoint: Optional[WorkflowCheckpoint] = None, + ) -> dict[str, Union[RequestInfoEvent, Any]]: + res = {} + # if has checkpoint (WorkflowAgent), find pending request info from checkpoint + if checkpoint and checkpoint.pending_request_info_events: + for call_id, request in checkpoint.pending_request_info_events.items(): + # find if the request is already responded in the thread messages + if isinstance(request, dict): + request_obj = RequestInfoEvent.from_dict(request) + res[call_id] = request_obj + return res + + if not thread_messages: + return res + + # if no checkpoint (Agent), find user input request and pair the feedbacks + for message in thread_messages: + for content in message.contents: + print(f" Content {type(content)}: {content.to_dict()}") + if isinstance(content, UserInputRequestContents): + # is a human input request + function_call = content.function_call + call_id = getattr(function_call, "call_id", "") + if call_id: + res[call_id] = RequestInfoEvent( + source_executor_id="agent", + request_id=call_id, + response_type=None, + request_data=function_call, + ) + elif isinstance(content, FunctionResultContent): + if content.call_id and content.call_id in res: + # remove requests that already got feedback + res.pop(content.call_id) + elif isinstance(content, FunctionApprovalResponseContent): + function_call = content.function_call + call_id = getattr(function_call, "call_id", "") + if call_id and call_id in res: + res.pop(call_id) + return res + + def convert_user_input_request_content(self, content: UserInputRequestContents) -> dict: + function_call = content.function_call + call_id = getattr(function_call, "call_id", "") + arguments = self.convert_request_arguments(getattr(function_call, "arguments", "")) + return { + "call_id": call_id, + "name": HUMAN_IN_THE_LOOP_FUNCTION_NAME, + "arguments": arguments or "", + } + + def convert_request_arguments(self, arguments: Any) -> str: + # convert data to payload if possible + if isinstance(arguments, dict): + data = arguments.get("data") + if data and hasattr(data, "convert_to_payload"): + return data.convert_to_payload() + + if not isinstance(arguments, str): + if hasattr(arguments, "to_dict"): # agentframework models have to_dict method + arguments = arguments.to_dict() + try: + arguments = json.dumps(arguments) + except Exception: # pragma: no cover - fallback # pylint: disable=broad-exception-caught + arguments = str(arguments) + return arguments + + def validate_and_convert_hitl_response(self, + input: str | List[Dict] | None, + pending_requests: Dict[str, RequestInfoEvent], + ) -> List[ChatMessage] | None: + + if input is None or isinstance(input, str): + logger.warning("Expected list input for HitL response validation, got str.") + return None + + res = [] + for item in input: + if item.get("type") != "function_call_output": + logger.warning("Expected function_call_output type for HitL response validation.") + return None + call_id = item.get("call_id", None) + if call_id and call_id in pending_requests: + res.append(self.convert_response(pending_requests[call_id], item)) + return res + + def convert_response(self, hitl_request: RequestInfoEvent, input: Dict) -> ChatMessage: + response_type = hitl_request.response_type + response_result = input.get("output", "") + logger.info(f"response_type {type(response_type)}: %s", response_type) + if response_type and hasattr(response_type, "convert_from_payload"): + response_result = response_type.convert_from_payload(input.get("output", "")) + logger.info(f"response_result {type(response_result)}: %s", response_result) + response_content = FunctionResultContent( + call_id=hitl_request.request_id, + result=response_result, + ) + return ChatMessage(role="tool", contents=[response_content]) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/.envtemplate b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/.envtemplate new file mode 100644 index 000000000000..bd646f163bb7 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/.envtemplate @@ -0,0 +1,3 @@ +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md new file mode 100644 index 000000000000..19f0335895e3 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/README.md @@ -0,0 +1,112 @@ +# Human-in-the-Loop Agent Framework Sample + +This sample shows how to host a Microsoft Agent Framework workflow inside Azure AI Agent Server while escalating responses to a real human when the reviewer executor decides that manual approval is required. + +## Prerequisites + +- Python 3.10+ and `pip` +- Azure CLI logged in with `az login` (used by `AzureCliCredential`) +- An Azure OpenAI chat deployment + +### Environment configuration + +1. Copy `.envtemplate` to `.env` and fill in your Azure OpenAI details: + + ``` + AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ + OPENAI_API_VERSION=2025-03-01-preview + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= + ``` + +2. Create a virtual environment (optional but recommended) and install the sample dependencies: + + ```powershell + python -m venv .venv + . .venv/Scripts/Activate.ps1 + pip install -r requirements.txt + ``` + +`main.py` automatically loads the `.env` file before spinning up the server. + +## Run the workflow-hosted agent + +From this directory start the adapter host (defaults to `http://0.0.0.0:8088`): + +```powershell +python main.py +``` + +The worker executor produces answers, the reviewer executor always escalates to a person, and the adapter exposes the whole workflow through the `/responses` endpoint. + +For Human-in-the-loop scenario, the `HumanReviewRequest` and `ReviewResponse` are classes provided by user. User should provide functions for these classes that allow adapter convert the data to request payloads. + + +## Send a user request + +Save the following payload to `request.json` (adjust the prompt as needed): + +```json +{ + "input": "Plan a 2-day Seattle trip that balances food and museums.", + "stream": false +} +``` + +Then call the server (PowerShell example): + +```pwsh +$body = Get-Content .\request.json -Raw +Invoke-RestMethod -Uri http://localhost:8088/responses -Method Post -ContentType "application/json" -Body $body ` + | ConvertTo-Json -Depth 8 +``` + +A human-review interrupt looks like this (formatted for clarity): + +```json +{ + "conversation": {"id": "conv_xxx"}, + "output": [ + { + "type": "function_call", + "name": "__hosted_agent_adapter_hitl__", + "call_id": "call_xxx", + "arguments": "{\"agent_request\":{\"request_id\":\"req_xxx\",...}}" + } + ] +} +``` + +Capture three values from the response: + +- `conversation.id` +- The `call_id` of the `__hosted_agent_adapter_hitl__` function call +- The `request_id` inside the serialized `agent_request` + +## Provide human feedback + +Respond by sending a `function_call_output` message that carries your review decision. Replace the placeholders before running the command: + +```pwsh +$payload = @{ + stream = $false + conversation = @{ id = "" } + input = @( + @{ + type = "function_call_output" + call_id = "" + output = '{"request_id":"","feedback":"Approved","approved":true}' + } + ) +} | ConvertTo-Json -Depth 5 + +Invoke-RestMethod -Uri http://localhost:8088/responses -Method Post -ContentType "application/json" -Body $payload ` + | ConvertTo-Json -Depth 8 +``` + +Update the JSON string in `output` to reject a response: + +```json +{"request_id":"","feedback":"Missing safety disclaimers.","approved":false} +``` + +Once the reviewer accepts the human feedback, the worker emits the approved assistant response and the HTTP call returns the final output. diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py new file mode 100644 index 000000000000..abc603f9be53 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/main.py @@ -0,0 +1,122 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential +from dotenv import load_dotenv + +from agent_framework import ( # noqa: E402 + Executor, + InMemoryCheckpointStorage, + WorkflowAgent, + WorkflowBuilder, + WorkflowContext, + handler, + response_handler, +) +from workflow_as_agent_reflection_pattern import ( # noqa: E402 + ReviewRequest, + ReviewResponse, + Worker, +) + +from azure.ai.agentserver.agentframework import from_agent_framework + +load_dotenv() + +@dataclass +class HumanReviewRequest: + """A request message type for escalation to a human reviewer.""" + + agent_request: ReviewRequest | None = None + + def convert_to_payload(self) -> str: + """Convert the HumanReviewRequest to a payload string.""" + request = self.agent_request + payload: dict[str, Any] = {"agent_request": None} + + if request: + payload["agent_request"] = { + "request_id": request.request_id, + "user_messages": [msg.to_dict() for msg in request.user_messages], + "agent_messages": [msg.to_dict() for msg in request.agent_messages], + } + + return json.dumps(payload, indent=2) + + +class ReviewerWithHumanInTheLoop(Executor): + """Executor that always escalates reviews to a human manager.""" + + def __init__(self, worker_id: str, reviewer_id: str | None = None) -> None: + unique_id = reviewer_id or f"{worker_id}-reviewer" + super().__init__(id=unique_id) + self._worker_id = worker_id + + @handler + async def review(self, request: ReviewRequest, ctx: WorkflowContext) -> None: + # In this simplified example, we always escalate to a human manager. + # See workflow_as_agent_reflection.py for an implementation + # using an automated agent to make the review decision. + print(f"Reviewer: Evaluating response for request {request.request_id[:8]}...") + print("Reviewer: Escalating to human manager...") + + # Forward the request to a human manager by sending a HumanReviewRequest. + await ctx.request_info( + request_data=HumanReviewRequest(agent_request=request), + response_type=ReviewResponse, + ) + + @response_handler + async def accept_human_review( + self, + original_request: HumanReviewRequest, + response: ReviewResponse, + ctx: WorkflowContext[ReviewResponse], + ) -> None: + # Accept the human review response and forward it back to the Worker. + print(f"Reviewer: Accepting human review for request {response.request_id[:8]}...") + print(f"Reviewer: Human feedback: {response.feedback}") + print(f"Reviewer: Human approved: {response.approved}") + print("Reviewer: Forwarding human review back to worker...") + await ctx.send_message(response, target_id=self._worker_id) + + +def build_agent(): + # Build a workflow with bidirectional communication between Worker and Reviewer, + # and escalation paths for human review. + agent = ( + WorkflowBuilder() + .register_executor( + lambda: Worker( + id="sub-worker", + chat_client=AzureOpenAIChatClient(credential=AzureCliCredential()), + ), + name="worker", + ) + .register_executor( + lambda: ReviewerWithHumanInTheLoop(worker_id="sub-worker"), + name="reviewer", + ) + .add_edge("worker", "reviewer") # Worker sends requests to Reviewer + .add_edge("reviewer", "worker") # Reviewer sends feedback to Worker + .set_start_executor("worker") + .build() + .as_agent() # Convert workflow into an agent interface + ) + return agent + + +async def run_agent() -> None: + """Run the workflow inside the agent server adapter.""" + agent = build_agent() + checkpoint_storage = InMemoryCheckpointStorage() + await from_agent_framework(agent).run_async() + +if __name__ == "__main__": + asyncio.run(run_agent()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/requirements.txt b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/requirements.txt new file mode 100644 index 000000000000..c044abf99eb1 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv>=1.0.0 +azure-identity +agent-framework-azure-ai +azure-ai-agentserver-core +azure-ai-agentserver-agentframework diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py new file mode 100644 index 000000000000..168d90cdd93d --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop/workflow_as_agent_reflection_pattern.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft. All rights reserved. + +from dataclasses import dataclass +import json +from uuid import uuid4 + +from agent_framework import ( + AgentRunResponseUpdate, + AgentRunUpdateEvent, + ChatClientProtocol, + ChatMessage, + Contents, + Executor, + Role, + WorkflowContext, + handler, +) + +@dataclass +class ReviewRequest: + """Structured request passed from Worker to Reviewer for evaluation.""" + + request_id: str + user_messages: list[ChatMessage] + agent_messages: list[ChatMessage] + + +@dataclass +class ReviewResponse: + """Structured response from Reviewer back to Worker.""" + + request_id: str + feedback: str + approved: bool + + @staticmethod + def convert_from_payload(payload: str) -> "ReviewResponse": + """Convert a JSON payload string to a ReviewResponse instance.""" + data = json.loads(payload) + return ReviewResponse( + request_id=data["request_id"], + feedback=data["feedback"], + approved=data["approved"], + ) + + +PendingReviewState = tuple[ReviewRequest, list[ChatMessage]] + + +class Worker(Executor): + """Executor that generates responses and incorporates feedback when necessary.""" + + def __init__(self, id: str, chat_client: ChatClientProtocol) -> None: + super().__init__(id=id) + self._chat_client = chat_client + self._pending_requests: dict[str, PendingReviewState] = {} + + @handler + async def handle_user_messages(self, user_messages: list[ChatMessage], ctx: WorkflowContext[ReviewRequest]) -> None: + print("Worker: Received user messages, generating response...") + + # Initialize chat with system prompt. + messages = [ChatMessage(role=Role.SYSTEM, text="You are a helpful assistant.")] + messages.extend(user_messages) + + print("Worker: Calling LLM to generate response...") + response = await self._chat_client.get_response(messages=messages) + print(f"Worker: Response generated: {response.messages[-1].text}") + + # Add agent messages to context. + messages.extend(response.messages) + + # Create review request and send to Reviewer. + request = ReviewRequest(request_id=str(uuid4()), user_messages=user_messages, agent_messages=response.messages) + print(f"Worker: Sending response for review (ID: {request.request_id[:8]})") + await ctx.send_message(request) + + # Track request for possible retry. + self._pending_requests[request.request_id] = (request, messages) + + @handler + async def handle_review_response(self, review: ReviewResponse, ctx: WorkflowContext[ReviewRequest]) -> None: + print(f"Worker: Received review for request {review.request_id[:8]} - Approved: {review.approved}") + + if review.request_id not in self._pending_requests: + raise ValueError(f"Unknown request ID in review: {review.request_id}") + + request, messages = self._pending_requests.pop(review.request_id) + + if review.approved: + print("Worker: Response approved. Emitting to external consumer...") + contents: list[Contents] = [] + for message in request.agent_messages: + contents.extend(message.contents) + + # Emit approved result to external consumer via AgentRunUpdateEvent. + await ctx.add_event( + AgentRunUpdateEvent(self.id, data=AgentRunResponseUpdate(contents=contents, role=Role.ASSISTANT)) + ) + return + + print(f"Worker: Response not approved. Feedback: {review.feedback}") + print("Worker: Regenerating response with feedback...") + + # Incorporate review feedback. + messages.append(ChatMessage(role=Role.SYSTEM, text=review.feedback)) + messages.append( + ChatMessage(role=Role.SYSTEM, text="Please incorporate the feedback and regenerate the response.") + ) + messages.extend(request.user_messages) + + # Retry with updated prompt. + response = await self._chat_client.get_response(messages=messages) + print(f"Worker: New response generated: {response.messages[-1].text}") + + messages.extend(response.messages) + + # Send updated request for re-review. + new_request = ReviewRequest( + request_id=review.request_id, user_messages=request.user_messages, agent_messages=response.messages + ) + await ctx.send_message(new_request) + + # Track new request for further evaluation. + self._pending_requests[new_request.request_id] = (new_request, messages) diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate new file mode 100644 index 000000000000..bd646f163bb7 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/.envtemplate @@ -0,0 +1,3 @@ +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md new file mode 100644 index 000000000000..64f19cefcbcb --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/README.md @@ -0,0 +1,46 @@ +pip install -e src/adapter/python +# Agent Framework Sample + +This sample demonstrates how to use the agents hosting adapter with Microsoft Agent Framework. + +## Prerequisites + +> **Azure sign-in:** Run `az login` before starting the sample so `DefaultAzureCredential` can acquire a CLI token. + +### Environment Variables + +Copy `.envtemplate` to `.env` and supply: + +``` +AZURE_OPENAI_ENDPOINT=https://.cognitiveservices.azure.com/ +OPENAI_API_VERSION=2025-03-01-preview +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME= +``` + +## Running the Sample + +Follow these steps from this folder: + +1) Start the agent server (defaults to 0.0.0.0:8088): + +```bash +python minimal_example.py +``` + +2) Send a non-streaming request (returns a single JSON response): + +```bash +curl -sS \ + -H "Content-Type: application/json" \ + -X POST http://localhost:8088/responses \ + -d "{\"input\":\"What's the weather like in Seattle?\",\"stream\":false}" +``` + +3) Send a streaming request (server-sent events). Use -N to disable curl buffering: + +```bash +curl -N \ + -H "Content-Type: application/json" \ + -X POST http://localhost:8088/responses \ + -d "{\"input\":\"What's the weather like in New York?\",\"stream\":true}" +``` \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py new file mode 100644 index 000000000000..483919a436cb --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/main.py @@ -0,0 +1,134 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Annotated, Any, Collection +from dotenv import load_dotenv + +load_dotenv() + +from agent_framework import ChatAgent, ChatMessage, ChatMessageStoreProtocol, FunctionResultContent, ai_function +from agent_framework._threads import ChatMessageStoreState +from agent_framework._types import UserInputRequestContents +from agent_framework.azure import AzureOpenAIChatClient + +from azure.ai.agentserver.agentframework import from_agent_framework + +""" +Tool Approvals with Threads + +This sample demonstrates using tool approvals with threads. +With threads, you don't need to manually pass previous messages - +the thread stores and retrieves them automatically. +""" + + +class CustomChatMessageStore(ChatMessageStoreProtocol): + """Implementation of custom chat message store. + In real applications, this can be an implementation of relational database or vector store.""" + + def __init__(self, messages: Collection[ChatMessage] | None = None) -> None: + self._messages: list[ChatMessage] = [] + if messages: + self._messages.extend(messages) + + async def add_messages(self, messages: Collection[ChatMessage]) -> None: + self._messages.extend(messages) + + async def list_messages(self) -> list[ChatMessage]: + return self._messages + + @classmethod + async def deserialize(cls, serialized_store_state: Any, **kwargs: Any) -> "CustomChatMessageStore": + """Create a new instance from serialized state.""" + store = cls() + await store.update_from_state(serialized_store_state, **kwargs) + return store + + async def update_from_state(self, serialized_store_state: Any, **kwargs: Any) -> None: + """Update this instance from serialized state.""" + if serialized_store_state: + state = ChatMessageStoreState.from_dict(serialized_store_state, **kwargs) + if state.messages: + self._messages.extend(state.messages) + + async def serialize(self, **kwargs: Any) -> Any: + """Serialize this store's state.""" + state = ChatMessageStoreState(messages=self._messages) + return state.to_dict(**kwargs) + + +@ai_function(approval_mode="always_require") +def add_to_calendar( + event_name: Annotated[str, "Name of the event"], date: Annotated[str, "Date of the event"] +) -> str: + """Add an event to the calendar (requires approval).""" + print(f">>> EXECUTING: add_to_calendar(event_name='{event_name}', date='{date}')") + return f"Added '{event_name}' to calendar on {date}" + + +def build_agent(): + return ChatAgent( + chat_client=AzureOpenAIChatClient(), + name="CalendarAgent", + instructions="You are a helpful calendar assistant.", + tools=[add_to_calendar], + chat_message_store_factory=CustomChatMessageStore, + ) + +async def run_agent() -> None: + """Example showing approval with threads.""" + print("=== Tool Approval with Thread ===\n") + + agent = build_agent() + + thread = agent.get_new_thread() + thread_id = thread.service_thread_id + # Step 1: Agent requests to call the tool + query = "Add a dentist appointment on March 15th" + print(f"User: {query}") + result = await agent.run(query, thread=thread) + serialized_thread = await thread.serialize() + print(f"Agent: {result.to_dict()}") + print(f"Thread: {serialized_thread}\n\n") + + resume_thread = await agent.deserialize_thread(serialized_thread) + res = await resume_thread.message_store.list_messages() + print(f"Resumed thread messages: {res}") + for message in res: + print(f" Thread message {type(message)}: {message.to_dict()}") + for content in message.contents: + print(f" Content {type(content)}: {content.to_dict()}") + + # Check for approval requests + if result.user_input_requests: + for request in result.user_input_requests: + print("\nApproval needed:") + print(f" Function: {request.function_call.name}") + print(f" Arguments: {request.function_call.arguments}") + print(f" type: {type(request.function_call)}") + print(f" function arg type: {type(request.function_call.arguments)}") + + # User approves (in real app, this would be user input) + approved = True # Change to False to see rejection + print(f" Decision: {'Approved' if approved else 'Rejected'}") + + # Step 2: Send approval response + # approval_response = request.create_response(approved=approved) + #response_message = ChatMessage(role="user", contents=[approval_response]) + approval_response = FunctionResultContent( + call_id = request.function_call.call_id, + result="denied", + ) + response_message = ChatMessage(role="tool", contents=[approval_response]) + result = await agent.run(response_message, thread=resume_thread) + + print(f"Agent: {result}\n") + + +async def main() -> None: + agent = build_agent() + await from_agent_framework(agent).run_async() + +if __name__ == "__main__": + asyncio.run(main()) + # asyncio.run(run_agent()) \ No newline at end of file diff --git a/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt new file mode 100644 index 000000000000..c044abf99eb1 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-agentframework/samples/human_in_the_loop_ai_function/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv>=1.0.0 +azure-identity +agent-framework-azure-ai +azure-ai-agentserver-core +azure-ai-agentserver-agentframework diff --git a/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py new file mode 100644 index 000000000000..7d21ee7a31ff --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/server/common/constants.py @@ -0,0 +1,6 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- + +# Reserved function name for HITL. +HUMAN_IN_THE_LOOP_FUNCTION_NAME = "__hosted_agent_adapter_hitl__" \ No newline at end of file