diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py index 0aa93f8b..d135d866 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/_start_agent_process.py @@ -1,6 +1,12 @@ from typing import Optional + from aiohttp.web import Request, Response -from microsoft_agents.hosting.core.app import AgentApplication + +from microsoft_agents.hosting.core import ( + AgentApplication, + start_agent_process as core_start_agent_process, +) + from .cloud_adapter import CloudAdapter @@ -9,18 +15,5 @@ async def start_agent_process( agent_application: AgentApplication, adapter: CloudAdapter, ) -> Optional[Response]: - """Starts the agent host with the provided adapter and agent application. - Args: - adapter (CloudAdapter): The adapter to use for the agent host. - agent_application (AgentApplication): The agent application to run. - """ - if not adapter: - raise TypeError("start_agent_process: adapter can't be None") - if not agent_application: - raise TypeError("start_agent_process: agent_application can't be None") - - # Start the agent application with the provided adapter - return await adapter.process( - request, - agent_application, - ) + """Starts the agent host with the provided adapter and agent application.""" + return await core_start_agent_process(request, agent_application, adapter) diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py index a0351aa0..8896f8f3 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/agent_http_adapter.py @@ -1,18 +1,12 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from abc import abstractmethod -from typing import Optional, Protocol +from typing import Protocol -from aiohttp.web import ( - Request, - Response, -) +from aiohttp.web import Request, Response -from microsoft_agents.hosting.core import Agent +from microsoft_agents.hosting.core import AgentHttpAdapterProtocol -class AgentHttpAdapter(Protocol): - @abstractmethod - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - raise NotImplementedError() +class AgentHttpAdapter(AgentHttpAdapterProtocol[Request, Response], Protocol): + """Framework specific alias for the shared AgentHttpAdapter protocol.""" diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py index 4a8a193b..bace245b 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/channel_service_route_table.py @@ -1,102 +1,80 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. import json -from typing import List, Union, Type +from typing import Any -from aiohttp.web import RouteTableDef, Request, Response +from aiohttp.web import HTTPUnsupportedMediaType, Request, Response, RouteTableDef -from microsoft_agents.activity import ( - AgentsModel, - Activity, - AttachmentData, - ConversationParameters, - Transcript, +from microsoft_agents.hosting.core import ( + ChannelApiHandlerProtocol, + ChannelServiceOperations, + serialize_agents_model, ) -from microsoft_agents.hosting.core import ChannelApiHandlerProtocol -async def deserialize_from_body( - request: Request, target_model: Type[AgentsModel] -) -> Activity: - if "application/json" in request.headers["Content-Type"]: - body = await request.json() - else: - return Response(status=415) +async def _read_payload(request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): + raise HTTPUnsupportedMediaType() + return await request.json() - return target_model.model_validate(body) +def _json_response(result: Any) -> Response: + if result is None: + return Response() -def get_serialized_response( - model_or_list: Union[AgentsModel, List[AgentsModel]], -) -> Response: - if isinstance(model_or_list, AgentsModel): - json_obj = model_or_list.model_dump( - mode="json", exclude_unset=True, by_alias=True - ) - else: - json_obj = [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] - - return Response(body=json.dumps(json_obj), content_type="application/json") + payload = serialize_agents_model(result) + return Response(body=json.dumps(payload), content_type="application/json") def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> RouteTableDef: - # pylint: disable=unused-variable routes = RouteTableDef() + operations = ChannelServiceOperations(handler) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_send_to_conversation( + payload = await _read_payload(request) + result = await operations.send_to_conversation( request.get("claims_identity"), request.match_info["conversation_id"], - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def reply_to_activity(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_reply_to_activity( + payload = await _read_payload(request) + result = await operations.reply_to_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_update_activity( + payload = await _read_payload(request) + result = await operations.update_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(request: Request): - await handler.on_delete_activity( + await operations.delete_activity( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], ) - return Response() @routes.get( @@ -104,91 +82,82 @@ async def delete_activity(request: Request): + "/v3/conversations/{conversation_id}/activities/{activity_id}/members" ) async def get_activity_members(request: Request): - result = await handler.on_get_activity_members( + result = await operations.get_activity_members( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["activity_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.post(base_url + "/") async def create_conversation(request: Request): - conversation_parameters = deserialize_from_body(request, ConversationParameters) - result = await handler.on_create_conversation( - request.get("claims_identity"), conversation_parameters + payload = await _read_payload(request) + result = await operations.create_conversation( + request.get("claims_identity"), + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/") async def get_conversation(request: Request): - # TODO: continuation token? conversation_id? - result = await handler.on_get_conversations( - request.get("claims_identity"), None + result = await operations.get_conversations( + request.get("claims_identity"), + None, ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(request: Request): - result = await handler.on_get_conversation_members( + result = await operations.get_conversation_members( request.get("claims_identity"), request.match_info["conversation_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member(request: Request): - result = await handler.on_get_conversation_member( + result = await operations.get_conversation_member( request.get("claims_identity"), request.match_info["member_id"], request.match_info["conversation_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(request: Request): - # TODO: continuation token? page size? - result = await handler.on_get_conversation_paged_members( + result = await operations.get_conversation_paged_members( request.get("claims_identity"), request.match_info["conversation_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member(request: Request): - result = await handler.on_delete_conversation_member( + result = await operations.delete_conversation_member( request.get("claims_identity"), request.match_info["conversation_id"], request.match_info["member_id"], ) - - return get_serialized_response(result) + return _json_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(request: Request): - transcript = deserialize_from_body(request, Transcript) - result = await handler.on_send_conversation_history( + payload = await _read_payload(request) + result = await operations.send_conversation_history( request.get("claims_identity"), request.match_info["conversation_id"], - transcript, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @routes.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(request: Request): - attachment_data = deserialize_from_body(request, AttachmentData) - result = await handler.on_upload_attachment( + payload = await _read_payload(request) + result = await operations.upload_attachment( request.get("claims_identity"), request.match_info["conversation_id"], - attachment_data, + payload, ) - - return get_serialized_response(result) + return _json_response(result) return routes diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py index 1ef106c3..ae1a3bee 100644 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/cloud_adapter.py @@ -1,118 +1,67 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -from traceback import format_exc -from typing import Optional +from typing import Any, Optional from aiohttp.web import ( - Request, - Response, - json_response, HTTPBadRequest, HTTPMethodNotAllowed, HTTPUnauthorized, HTTPUnsupportedMediaType, + Request, + Response, + json_response, ) -from microsoft_agents.hosting.core.authorization import ( - ClaimsIdentity, - Connections, -) -from microsoft_agents.activity import ( - Activity, - DeliveryModes, -) + from microsoft_agents.hosting.core import ( - Agent, - ChannelServiceAdapter, + CloudAdapterBase, + Connections, ChannelServiceClientFactoryBase, - MessageFactory, - RestChannelServiceClientFactory, - TurnContext, ) +from microsoft_agents.hosting.core.authorization import ClaimsIdentity from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): +class CloudAdapter(CloudAdapterBase[Request, Response], AgentHttpAdapter): def __init__( self, *, - connection_manager: Connections = None, - channel_service_client_factory: ChannelServiceClientFactoryBase = None, - ): - """ - Initializes a new instance of the CloudAdapter class. - - :param channel_service_client_factory: The factory to use to create the channel service client. - """ - - async def on_turn_error(context: TurnContext, error: Exception): - error_message = f"Exception caught : {error}" - print(format_exc()) - - await context.send_activity(MessageFactory.text(error_message)) - - # Send a trace activity - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", - ) - - self.on_turn_error = on_turn_error - - channel_service_client_factory = ( - channel_service_client_factory - or RestChannelServiceClientFactory(connection_manager) + connection_manager: Connections | None = None, + channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, + ) -> None: + super().__init__( + connection_manager=connection_manager, + channel_service_client_factory=channel_service_client_factory, ) - super().__init__(channel_service_client_factory) + def _get_method(self, request: Request) -> str: + return request.method - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - if not request: - raise TypeError("CloudAdapter.process: request can't be None") - if not agent: - raise TypeError("CloudAdapter.process: agent can't be None") + async def _read_json_body(self, request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): + raise self._unsupported_media_type_error(request) + return await request.json() - if request.method == "POST": - # Deserialize the incoming Activity - if "application/json" in request.headers["Content-Type"]: - body = await request.json() - else: - raise HTTPUnsupportedMediaType() + def _get_claims_identity(self, request: Request) -> ClaimsIdentity | None: + return request.get("claims_identity", self._default_claims_identity()) - activity: Activity = Activity.model_validate(body) + def _method_not_allowed_error(self, request: Request) -> Exception: + return HTTPMethodNotAllowed(request.method, ["POST"]) - # default to anonymous identity with no claims - claims_identity: ClaimsIdentity = request.get( - "claims_identity", ClaimsIdentity({}, False) - ) + def _unsupported_media_type_error(self, request: Request) -> Exception: + return HTTPUnsupportedMediaType() - # A POST request must contain an Activity - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise HTTPBadRequest + def _bad_request_error(self, request: Request) -> Exception: + return HTTPBadRequest() - try: - # Process the inbound activity with the agent - invoke_response = await self.process_activity( - claims_identity, activity, agent.on_turn - ) + def _unauthorized_error(self, request: Request) -> Exception: + return HTTPUnauthorized() - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. - return json_response( - data=invoke_response.body, status=invoke_response.status - ) + def _create_invoke_response( + self, request: Request, invoke_response: Any + ) -> Response: + return json_response( + data=invoke_response.body, + status=invoke_response.status, + ) - return Response(status=202) - except PermissionError: - raise HTTPUnauthorized - else: - raise HTTPMethodNotAllowed + def _create_accepted_response(self, request: Request) -> Response: + return Response(status=202) diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py index 90d6f0ec..5ae2c23e 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/__init__.py @@ -9,6 +9,14 @@ from .middleware_set import Middleware from .rest_channel_service_client_factory import RestChannelServiceClientFactory from .turn_context import TurnContext +from .http import ( + AgentHttpAdapterProtocol, + ChannelServiceOperations, + CloudAdapterBase, + parse_agents_model, + serialize_agents_model, + start_agent_process, +) # Application Style from .app._type_defs import RouteHandler, RouteSelector, StateT @@ -96,6 +104,12 @@ "Middleware", "RestChannelServiceClientFactory", "TurnContext", + "AgentHttpAdapterProtocol", + "ChannelServiceOperations", + "CloudAdapterBase", + "parse_agents_model", + "serialize_agents_model", + "start_agent_process", "AgentApplication", "ApplicationError", "ApplicationOptions", diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py new file mode 100644 index 00000000..abfe8841 --- /dev/null +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/http.py @@ -0,0 +1,360 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +"""Shared HTTP hosting utilities used by framework-specific adapters.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Iterable +from traceback import format_exc +from typing import Any, Generic, Optional, Protocol, Type, TypeVar + +from microsoft_agents.activity import ( + Activity, + AgentsModel, + AttachmentData, + ConversationParameters, + DeliveryModes, + Transcript, +) + +from .authorization.claims_identity import ClaimsIdentity +from .authorization.connections import Connections +from .channel_api_handler_protocol import ChannelApiHandlerProtocol +from .channel_service_adapter import ChannelServiceAdapter +from .channel_service_client_factory_base import ChannelServiceClientFactoryBase +from .message_factory import MessageFactory +from .rest_channel_service_client_factory import RestChannelServiceClientFactory +from .turn_context import TurnContext + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover - imported for type checking only + from microsoft_agents.hosting.core.agent import Agent + from microsoft_agents.hosting.core.app.agent_application import AgentApplication + + +TModel = TypeVar("TModel", bound=AgentsModel) +RequestT = TypeVar("RequestT") +ResponseT = TypeVar("ResponseT") + + +class AgentHttpAdapterProtocol(Protocol, Generic[RequestT, ResponseT]): + """Protocol describing the contract for framework specific HTTP adapters.""" + + async def process(self, request: RequestT, agent: "Agent") -> Optional[ResponseT]: + raise NotImplementedError + + +async def start_agent_process( + request: RequestT, + agent_application: "AgentApplication", + adapter: AgentHttpAdapterProtocol[RequestT, ResponseT], +) -> Optional[ResponseT]: + """Start the agent process using the provided adapter and application.""" + if adapter is None: + raise TypeError("start_agent_process: adapter can't be None") + if agent_application is None: + raise TypeError("start_agent_process: agent_application can't be None") + + return await adapter.process(request, agent_application) + + +def parse_agents_model(payload: Any, model_type: Type[TModel]) -> TModel: + """Parse a payload into the requested AgentsModel derived type.""" + return model_type.model_validate(payload) + + +def serialize_agents_model(model_or_list: AgentsModel | Iterable[AgentsModel]) -> Any: + """Serialize AgentsModel instances into JSON serialisable structures.""" + if isinstance(model_or_list, AgentsModel): + return model_or_list.model_dump(mode="json", exclude_unset=True, by_alias=True) + + return [ + model.model_dump(mode="json", exclude_unset=True, by_alias=True) + for model in model_or_list + ] + + +class ChannelServiceOperations: + """Shared activity channel operations used by HTTP frameworks.""" + + def __init__(self, handler: ChannelApiHandlerProtocol) -> None: + self._handler = handler + + async def send_to_conversation( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_payload: Any, + ): + activity = parse_agents_model(activity_payload, Activity) + return await self._handler.on_send_to_conversation( + claims_identity, + conversation_id, + activity, + ) + + async def reply_to_activity( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + activity_payload: Any, + ): + activity = parse_agents_model(activity_payload, Activity) + return await self._handler.on_reply_to_activity( + claims_identity, + conversation_id, + activity_id, + activity, + ) + + async def update_activity( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + activity_payload: Any, + ): + activity = parse_agents_model(activity_payload, Activity) + return await self._handler.on_update_activity( + claims_identity, + conversation_id, + activity_id, + activity, + ) + + async def delete_activity( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + ) -> None: + await self._handler.on_delete_activity( + claims_identity, + conversation_id, + activity_id, + ) + + async def get_activity_members( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + activity_id: str, + ): + return await self._handler.on_get_activity_members( + claims_identity, + conversation_id, + activity_id, + ) + + async def create_conversation( + self, + claims_identity: ClaimsIdentity, + parameters_payload: Any, + ): + parameters = parse_agents_model(parameters_payload, ConversationParameters) + return await self._handler.on_create_conversation( + claims_identity, + parameters, + ) + + async def get_conversations( + self, + claims_identity: ClaimsIdentity, + conversation_id: str | None = None, + ): + return await self._handler.on_get_conversations( + claims_identity, + conversation_id, + ) + + async def get_conversation_members( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + ): + return await self._handler.on_get_conversation_members( + claims_identity, + conversation_id, + ) + + async def get_conversation_member( + self, + claims_identity: ClaimsIdentity, + user_id: str, + conversation_id: str, + ): + return await self._handler.on_get_conversation_member( + claims_identity, + user_id, + conversation_id, + ) + + async def get_conversation_paged_members( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + page_size: int | None = None, + continuation_token: str | None = None, + ): + return await self._handler.on_get_conversation_paged_members( + claims_identity, + conversation_id, + page_size, + continuation_token, + ) + + async def delete_conversation_member( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + member_id: str, + ): + return await self._handler.on_delete_conversation_member( + claims_identity, + conversation_id, + member_id, + ) + + async def send_conversation_history( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + transcript_payload: Any, + ): + transcript = parse_agents_model(transcript_payload, Transcript) + return await self._handler.on_send_conversation_history( + claims_identity, + conversation_id, + transcript, + ) + + async def upload_attachment( + self, + claims_identity: ClaimsIdentity, + conversation_id: str, + attachment_payload: Any, + ): + attachment = parse_agents_model(attachment_payload, AttachmentData) + return await self._handler.on_upload_attachment( + claims_identity, + conversation_id, + attachment, + ) + + +class CloudAdapterBase(ChannelServiceAdapter, Generic[RequestT, ResponseT], ABC): + """Base implementation for framework specific CloudAdapter implementations.""" + + def __init__( + self, + *, + connection_manager: Connections | None = None, + channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, + ) -> None: + async def on_turn_error(context: TurnContext, error: Exception) -> None: + error_message = f"Exception caught : {error}" + print(format_exc()) + + await context.send_activity(MessageFactory.text(error_message)) + + await context.send_trace_activity( + "OnTurnError Trace", + error_message, + "https://www.botframework.com/schemas/error", + "TurnError", + ) + + self.on_turn_error = on_turn_error + + factory = channel_service_client_factory or RestChannelServiceClientFactory( + connection_manager + ) + + super().__init__(factory) + + async def process( + self, request: RequestT, agent: "Agent" + ) -> Optional[ResponseT]: # pragma: no cover - exercised via subclasses + if request is None: + raise TypeError("CloudAdapter.process: request can't be None") + if agent is None: + raise TypeError("CloudAdapter.process: agent can't be None") + + if self._get_method(request) != "POST": + raise self._method_not_allowed_error(request) + + body = await self._read_json_body(request) + activity: Activity = Activity.model_validate(body) + + claims_identity = self._get_claims_identity(request) + if not claims_identity: + claims_identity = self._default_claims_identity() + + if ( + not activity.type + or not activity.conversation + or not activity.conversation.id + ): + raise self._bad_request_error(request) + + try: + invoke_response = await self.process_activity( + claims_identity, + activity, + agent.on_turn, + ) + except PermissionError as error: + raise self._unauthorized_error(request) from error + + if ( + activity.type == "invoke" + or activity.delivery_mode == DeliveryModes.expect_replies + ): + return self._create_invoke_response(request, invoke_response) + + return self._create_accepted_response(request) + + def _default_claims_identity(self) -> ClaimsIdentity: + return ClaimsIdentity({}, False) + + @abstractmethod + def _get_method(self, request: RequestT) -> str: + """Return the HTTP method for the incoming request.""" + + @abstractmethod + async def _read_json_body(self, request: RequestT) -> Any: + """Read and return the JSON payload.""" + + @abstractmethod + def _get_claims_identity(self, request: RequestT) -> ClaimsIdentity | None: + """Extract the claims identity from the request.""" + + @abstractmethod + def _method_not_allowed_error(self, request: RequestT) -> Exception: + """Return the exception raised when the request method is unsupported.""" + + @abstractmethod + def _unsupported_media_type_error(self, request: RequestT) -> Exception: + """Return the exception raised when the content type is unsupported.""" + + @abstractmethod + def _bad_request_error(self, request: RequestT) -> Exception: + """Return the exception raised when the request payload is invalid.""" + + @abstractmethod + def _unauthorized_error(self, request: RequestT) -> Exception: + """Return the exception raised when authorization fails.""" + + @abstractmethod + def _create_invoke_response( + self, request: RequestT, invoke_response: Any + ) -> ResponseT: + """Create the framework specific response for invoke results.""" + + @abstractmethod + def _create_accepted_response(self, request: RequestT) -> ResponseT: + """Create the framework specific HTTP 202 response.""" diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py index 13396ca8..7d87573e 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/_start_agent_process.py @@ -1,6 +1,12 @@ from typing import Optional + from fastapi import Request, Response -from microsoft_agents.hosting.core.app import AgentApplication + +from microsoft_agents.hosting.core import ( + AgentApplication, + start_agent_process as core_start_agent_process, +) + from .cloud_adapter import CloudAdapter @@ -9,18 +15,5 @@ async def start_agent_process( agent_application: AgentApplication, adapter: CloudAdapter, ) -> Optional[Response]: - """Starts the agent host with the provided adapter and agent application. - Args: - adapter (CloudAdapter): The adapter to use for the agent host. - agent_application (AgentApplication): The agent application to run. - """ - if not adapter: - raise TypeError("start_agent_process: adapter can't be None") - if not agent_application: - raise TypeError("start_agent_process: agent_application can't be None") - - # Start the agent application with the provided adapter - return await adapter.process( - request, - agent_application, - ) + """Starts the agent host with the provided adapter and agent application.""" + return await core_start_agent_process(request, agent_application, adapter) diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py index 2584b272..f1a08425 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/agent_http_adapter.py @@ -1,15 +1,12 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from abc import abstractmethod -from typing import Optional, Protocol +from typing import Protocol from fastapi import Request, Response -from microsoft_agents.hosting.core import Agent +from microsoft_agents.hosting.core import AgentHttpAdapterProtocol -class AgentHttpAdapter(Protocol): - @abstractmethod - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - raise NotImplementedError() +class AgentHttpAdapter(AgentHttpAdapterProtocol[Request, Response], Protocol): + """Framework specific alias for the shared AgentHttpAdapter protocol.""" diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py index 2dd009fc..e55c9076 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/channel_service_route_table.py @@ -1,64 +1,44 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -import json -from typing import List, Union, Type +from typing import Any -from fastapi import APIRouter, Request, Response, HTTPException, Depends +from fastapi import APIRouter, HTTPException, Request, Response from fastapi.responses import JSONResponse -from microsoft_agents.activity import ( - AgentsModel, - Activity, - AttachmentData, - ConversationParameters, - Transcript, +from microsoft_agents.hosting.core import ( + ChannelApiHandlerProtocol, + ChannelServiceOperations, + serialize_agents_model, ) -from microsoft_agents.hosting.core import ChannelApiHandlerProtocol -async def deserialize_from_body( - request: Request, target_model: Type[AgentsModel] -) -> AgentsModel: - content_type = request.headers.get("Content-Type", "") - if "application/json" in content_type: - body = await request.json() - else: +async def _read_payload(request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): raise HTTPException(status_code=415, detail="Unsupported Media Type") + return await request.json() - return target_model.model_validate(body) +def _json_response(result: Any) -> Response: + if result is None: + return Response(status_code=200) -def get_serialized_response( - model_or_list: Union[AgentsModel, List[AgentsModel]], -) -> JSONResponse: - if isinstance(model_or_list, AgentsModel): - json_obj = model_or_list.model_dump( - mode="json", exclude_unset=True, by_alias=True - ) - else: - json_obj = [ - model.model_dump(mode="json", exclude_unset=True, by_alias=True) - for model in model_or_list - ] - - return JSONResponse(content=json_obj) + payload = serialize_agents_model(result) + return JSONResponse(content=payload) def channel_service_route_table( handler: ChannelApiHandlerProtocol, base_url: str = "" ) -> APIRouter: router = APIRouter() + operations = ChannelServiceOperations(handler) @router.post(base_url + "/v3/conversations/{conversation_id}/activities") async def send_to_conversation(conversation_id: str, request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_send_to_conversation( + payload = await _read_payload(request) + result = await operations.send_to_conversation( getattr(request.state, "claims_identity", None), conversation_id, - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.post( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" @@ -66,40 +46,37 @@ async def send_to_conversation(conversation_id: str, request: Request): async def reply_to_activity( conversation_id: str, activity_id: str, request: Request ): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_reply_to_activity( + payload = await _read_payload(request) + result = await operations.reply_to_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.put( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def update_activity(conversation_id: str, activity_id: str, request: Request): - activity = await deserialize_from_body(request, Activity) - result = await handler.on_update_activity( + payload = await _read_payload(request) + result = await operations.update_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, - activity, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.delete( base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}" ) async def delete_activity(conversation_id: str, activity_id: str, request: Request): - await handler.on_delete_activity( + await operations.delete_activity( getattr(request.state, "claims_identity", None), conversation_id, activity_id, ) - return Response(status_code=200) @router.get( @@ -109,97 +86,86 @@ async def delete_activity(conversation_id: str, activity_id: str, request: Reque async def get_activity_members( conversation_id: str, activity_id: str, request: Request ): - result = await handler.on_get_activity_members( + result = await operations.get_activity_members( getattr(request.state, "claims_identity", None), conversation_id, activity_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.post(base_url + "/") async def create_conversation(request: Request): - conversation_parameters = await deserialize_from_body( - request, ConversationParameters - ) - result = await handler.on_create_conversation( - getattr(request.state, "claims_identity", None), conversation_parameters + payload = await _read_payload(request) + result = await operations.create_conversation( + getattr(request.state, "claims_identity", None), + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/") async def get_conversation(request: Request): - # TODO: continuation token? conversation_id? - result = await handler.on_get_conversations( - getattr(request.state, "claims_identity", None), None + result = await operations.get_conversations( + getattr(request.state, "claims_identity", None), + None, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/members") async def get_conversation_members(conversation_id: str, request: Request): - result = await handler.on_get_conversation_members( + result = await operations.get_conversation_members( getattr(request.state, "claims_identity", None), conversation_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def get_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await handler.on_get_conversation_member( + result = await operations.get_conversation_member( getattr(request.state, "claims_identity", None), member_id, conversation_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers") async def get_conversation_paged_members(conversation_id: str, request: Request): - # TODO: continuation token? page size? - result = await handler.on_get_conversation_paged_members( + result = await operations.get_conversation_paged_members( getattr(request.state, "claims_identity", None), conversation_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}") async def delete_conversation_member( conversation_id: str, member_id: str, request: Request ): - result = await handler.on_delete_conversation_member( + result = await operations.delete_conversation_member( getattr(request.state, "claims_identity", None), conversation_id, member_id, ) - - return get_serialized_response(result) + return _json_response(result) @router.post(base_url + "/v3/conversations/{conversation_id}/activities/history") async def send_conversation_history(conversation_id: str, request: Request): - transcript = await deserialize_from_body(request, Transcript) - result = await handler.on_send_conversation_history( + payload = await _read_payload(request) + result = await operations.send_conversation_history( getattr(request.state, "claims_identity", None), conversation_id, - transcript, + payload, ) - - return get_serialized_response(result) + return _json_response(result) @router.post(base_url + "/v3/conversations/{conversation_id}/attachments") async def upload_attachment(conversation_id: str, request: Request): - attachment_data = await deserialize_from_body(request, AttachmentData) - result = await handler.on_upload_attachment( + payload = await _read_payload(request) + result = await operations.upload_attachment( getattr(request.state, "claims_identity", None), conversation_id, - attachment_data, + payload, ) - - return get_serialized_response(result) + return _json_response(result) return router diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py index 3383c793..961804d2 100644 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py +++ b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/cloud_adapter.py @@ -1,112 +1,62 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -from traceback import format_exc -from typing import Optional +from typing import Any, Optional -from fastapi import Request, Response, HTTPException +from fastapi import HTTPException, Request, Response from fastapi.responses import JSONResponse -from microsoft_agents.hosting.core.authorization import ( - ClaimsIdentity, - Connections, -) -from microsoft_agents.activity import ( - Activity, - DeliveryModes, -) + from microsoft_agents.hosting.core import ( - Agent, - ChannelServiceAdapter, ChannelServiceClientFactoryBase, - MessageFactory, - RestChannelServiceClientFactory, - TurnContext, + CloudAdapterBase, + Connections, ) +from microsoft_agents.hosting.core.authorization import ClaimsIdentity from .agent_http_adapter import AgentHttpAdapter -class CloudAdapter(ChannelServiceAdapter, AgentHttpAdapter): +class CloudAdapter(CloudAdapterBase[Request, Response], AgentHttpAdapter): def __init__( self, *, - connection_manager: Connections = None, - channel_service_client_factory: ChannelServiceClientFactoryBase = None, - ): - """ - Initializes a new instance of the CloudAdapter class. - - :param channel_service_client_factory: The factory to use to create the channel service client. - """ - - async def on_turn_error(context: TurnContext, error: Exception): - error_message = f"Exception caught : {error}" - print(format_exc()) - - await context.send_activity(MessageFactory.text(error_message)) - - # Send a trace activity - await context.send_trace_activity( - "OnTurnError Trace", - error_message, - "https://www.botframework.com/schemas/error", - "TurnError", - ) - - self.on_turn_error = on_turn_error - - channel_service_client_factory = ( - channel_service_client_factory - or RestChannelServiceClientFactory(connection_manager) + connection_manager: Connections | None = None, + channel_service_client_factory: ChannelServiceClientFactoryBase | None = None, + ) -> None: + super().__init__( + connection_manager=connection_manager, + channel_service_client_factory=channel_service_client_factory, ) - super().__init__(channel_service_client_factory) + def _get_method(self, request: Request) -> str: + return request.method - async def process(self, request: Request, agent: Agent) -> Optional[Response]: - if not request: - raise TypeError("CloudAdapter.process: request can't be None") - if not agent: - raise TypeError("CloudAdapter.process: agent can't be None") + async def _read_json_body(self, request: Request) -> Any: + if "application/json" not in request.headers.get("Content-Type", ""): + raise self._unsupported_media_type_error(request) + return await request.json() - if request.method == "POST": - # Deserialize the incoming Activity - content_type = request.headers.get("Content-Type", "") - if "application/json" in content_type: - body = await request.json() - else: - raise HTTPException(status_code=415, detail="Unsupported Media Type") + def _get_claims_identity(self, request: Request) -> ClaimsIdentity | None: + return getattr( + request.state, "claims_identity", self._default_claims_identity() + ) - activity: Activity = Activity.model_validate(body) + def _method_not_allowed_error(self, request: Request) -> Exception: + return HTTPException(status_code=405, detail="Method Not Allowed") - # default to anonymous identity with no claims - claims_identity: ClaimsIdentity = getattr( - request.state, "claims_identity", ClaimsIdentity({}, False) - ) + def _unsupported_media_type_error(self, request: Request) -> Exception: + return HTTPException(status_code=415, detail="Unsupported Media Type") - # A POST request must contain an Activity - if ( - not activity.type - or not activity.conversation - or not activity.conversation.id - ): - raise HTTPException(status_code=400, detail="Bad Request") + def _bad_request_error(self, request: Request) -> Exception: + return HTTPException(status_code=400, detail="Bad Request") - try: - # Process the inbound activity with the agent - invoke_response = await self.process_activity( - claims_identity, activity, agent.on_turn - ) + def _unauthorized_error(self, request: Request) -> Exception: + return HTTPException(status_code=401, detail="Unauthorized") - if ( - activity.type == "invoke" - or activity.delivery_mode == DeliveryModes.expect_replies - ): - # Invoke and ExpectReplies cannot be performed async, the response must be written before the calling thread is released. - return JSONResponse( - content=invoke_response.body, status_code=invoke_response.status - ) + def _create_invoke_response( + self, request: Request, invoke_response: Any + ) -> Response: + return JSONResponse( + content=invoke_response.body, + status_code=invoke_response.status, + ) - return Response(status_code=202) - except PermissionError: - raise HTTPException(status_code=401, detail="Unauthorized") - else: - raise HTTPException(status_code=405, detail="Method Not Allowed") + def _create_accepted_response(self, request: Request) -> Response: + return Response(status_code=202)