diff --git a/azure/durable_functions/__init__.py b/azure/durable_functions/__init__.py index e1bb4e58..a38fdba2 100644 --- a/azure/durable_functions/__init__.py +++ b/azure/durable_functions/__init__.py @@ -79,3 +79,11 @@ def validate_extension_bundles(): __all__.append('Blueprint') except ModuleNotFoundError: pass + +# Import OpenAI Agents integration (optional dependency) +try: + from . import openai_agents # noqa + __all__.append('openai_agents') +except ImportError: + # OpenAI agents integration requires additional dependencies + pass diff --git a/azure/durable_functions/decorators/durable_app.py b/azure/durable_functions/decorators/durable_app.py index 62b5b704..219b7dc8 100644 --- a/azure/durable_functions/decorators/durable_app.py +++ b/azure/durable_functions/decorators/durable_app.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger,\ + +from azure.durable_functions.models.RetryOptions import RetryOptions +from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ DurableClient from typing import Callable, Optional from azure.durable_functions.entity import Entity @@ -45,6 +47,7 @@ def __init__(self, New instance of a Durable Functions app """ super().__init__(auth_level=http_auth_level) + self._is_durable_openai_agent_setup = False def _configure_entity_callable(self, wrap) -> Callable: """Obtain decorator to construct an Entity class from a user-defined Function. @@ -250,6 +253,66 @@ def decorator(): return wrap + def _create_invoke_model_activity(self, model_provider, activity_name): + """Create and register the invoke_model_activity function with the provided FunctionApp.""" + + @self.activity_trigger(input_name="input", activity=activity_name) + async def run_model_activity(input: str): + from azure.durable_functions.openai_agents.orchestrator_generator\ + import durable_openai_agent_activity + + return await durable_openai_agent_activity(input, model_provider) + + return run_model_activity + + def _setup_durable_openai_agent(self, model_provider, activity_name): + if not self._is_durable_openai_agent_setup: + self._create_invoke_model_activity(model_provider, activity_name) + self._is_durable_openai_agent_setup = True + + def durable_openai_agent_orchestrator( + self, + _func=None, + *, + model_provider=None, + model_retry_options: Optional[RetryOptions] = RetryOptions( + first_retry_interval_in_milliseconds=2000, max_number_of_attempts=5 + ), + ): + """Decorate Azure Durable Functions orchestrators that use OpenAI Agents. + + Parameters + ---------- + model_provider: Optional[ModelProvider] + Use a non-default ModelProvider instead of the default OpenAIProvider, + such as when testing. + """ + from agents import ModelProvider + from azure.durable_functions.openai_agents.orchestrator_generator\ + import durable_openai_agent_orchestrator_generator + + if model_provider is not None and type(model_provider) is not ModelProvider: + raise TypeError("Provided model provider must be of type ModelProvider") + + activity_name = "run_model" + + self._setup_durable_openai_agent(model_provider, activity_name) + + def generator_wrapper_wrapper(func): + + @wraps(func) + def generator_wrapper(context): + return durable_openai_agent_orchestrator_generator( + func, context, model_retry_options, activity_name + ) + + return generator_wrapper + + if _func is None: + return generator_wrapper_wrapper + else: + return generator_wrapper_wrapper(_func) + class DFApp(Blueprint, FunctionRegister): """Durable Functions (DF) app. diff --git a/azure/durable_functions/openai_agents/__init__.py b/azure/durable_functions/openai_agents/__init__.py new file mode 100644 index 00000000..fb2aa87a --- /dev/null +++ b/azure/durable_functions/openai_agents/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +"""OpenAI Agents integration for Durable Functions. + +This module provides decorators and utilities to integrate OpenAI Agents +with Durable Functions orchestration patterns. +""" + +from .context import DurableAIAgentContext + +__all__ = [ + 'DurableAIAgentContext', +] diff --git a/azure/durable_functions/openai_agents/context.py b/azure/durable_functions/openai_agents/context.py new file mode 100644 index 00000000..58a396b5 --- /dev/null +++ b/azure/durable_functions/openai_agents/context.py @@ -0,0 +1,194 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +from typing import Any, Callable, Optional, TYPE_CHECKING, Union + +from azure.durable_functions.models.DurableOrchestrationContext import ( + DurableOrchestrationContext, +) +from azure.durable_functions.models.RetryOptions import RetryOptions + +from agents import RunContextWrapper, Tool +from agents.function_schema import function_schema +from agents.tool import FunctionTool + +from azure.durable_functions.models.Task import TaskBase +from .task_tracker import TaskTracker + + +if TYPE_CHECKING: + # At type-check time we want all members / signatures for IDE & linters. + _BaseDurableContext = DurableOrchestrationContext +else: + class _BaseDurableContext: # lightweight runtime stub + """Runtime stub base class for delegation; real context is wrapped. + + At runtime we avoid inheriting from DurableOrchestrationContext so that + attribute lookups for its members are delegated via __getattr__ to the + wrapped ``_context`` instance. + """ + + __slots__ = () + + +class DurableAIAgentContext(_BaseDurableContext): + """Context for AI agents running in Azure Durable Functions orchestration. + + Design + ------ + * Static analysis / IDEs: Appears to subclass ``DurableOrchestrationContext`` so + you get autocompletion and type hints (under TYPE_CHECKING branch). + * Runtime: Inherits from a trivial stub. All durable orchestration operations + are delegated to the real ``DurableOrchestrationContext`` instance provided + as ``context`` and stored in ``_context``. + + Consequences + ------------ + * ``isinstance(DurableAIAgentContext, DurableOrchestrationContext)`` is **False** at + runtime (expected). + * Delegation via ``__getattr__`` works for every member of the real context. + * No reliance on internal initialization side-effects of the durable SDK. + """ + + def __init__( + self, + context: DurableOrchestrationContext, + task_tracker: TaskTracker, + model_retry_options: Optional[RetryOptions], + ): + self._context = context + self._task_tracker = task_tracker + self._model_retry_options = model_retry_options + + def call_activity( + self, name: Union[str, Callable], input_: Optional[Any] = None + ) -> TaskBase: + """Schedule an activity for execution. + + Parameters + ---------- + name: str | Callable + Either the name of the activity function to call, as a string or, + in the Python V2 programming model, the activity function itself. + input_: Optional[Any] + The JSON-serializable input to pass to the activity function. + + Returns + ------- + Task + A Durable Task that completes when the called activity function completes or fails. + """ + task = self._context.call_activity(name, input_) + self._task_tracker.record_activity_call() + return task + + def call_activity_with_retry( + self, + name: Union[str, Callable], + retry_options: RetryOptions, + input_: Optional[Any] = None, + ) -> TaskBase: + """Schedule an activity for execution with retry options. + + Parameters + ---------- + name: str | Callable + Either the name of the activity function to call, as a string or, + in the Python V2 programming model, the activity function itself. + retry_options: RetryOptions + The retry options for the activity function. + input_: Optional[Any] + The JSON-serializable input to pass to the activity function. + + Returns + ------- + Task + A Durable Task that completes when the called activity function completes or + fails completely. + """ + task = self._context.call_activity_with_retry(name, retry_options, input_) + self._task_tracker.record_activity_call() + return task + + def create_activity_tool( + self, + activity_func: Callable, + *, + description: Optional[str] = None, + retry_options: Optional[RetryOptions] = RetryOptions( + first_retry_interval_in_milliseconds=2000, max_number_of_attempts=5 + ), + ) -> Tool: + """Convert an Azure Durable Functions activity to an OpenAI Agents SDK Tool. + + Args + ---- + activity_func: The Azure Functions activity function to convert + description: Optional description override for the tool + retry_options: The retry options for the activity function + + Returns + ------- + Tool: An OpenAI Agents SDK Tool object + + """ + if activity_func._function is None: + raise ValueError("The provided function is not a valid Azure Function.") + + if (activity_func._function._trigger is not None + and activity_func._function._trigger.activity is not None): + activity_name = activity_func._function._trigger.activity + else: + activity_name = activity_func._function._name + + input_name = None + if (activity_func._function._trigger is not None + and hasattr(activity_func._function._trigger, 'name')): + input_name = activity_func._function._trigger.name + + async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any: + # Parse JSON input and extract the named value if input_name is specified + activity_input = input + if input_name: + try: + parsed_input = json.loads(input) + if isinstance(parsed_input, dict) and input_name in parsed_input: + activity_input = parsed_input[input_name] + # If parsing fails or the named parameter is not found, pass the original input + except (json.JSONDecodeError, TypeError): + pass + + if retry_options: + result = self._task_tracker.get_activity_call_result_with_retry( + activity_name, retry_options, activity_input + ) + else: + result = self._task_tracker.get_activity_call_result(activity_name, activity_input) + return result + + schema = function_schema( + func=activity_func._function._func, + docstring_style=None, + description_override=description, + use_docstring_info=True, + strict_json_schema=True, + ) + + return FunctionTool( + name=schema.name, + description=schema.description or "", + params_json_schema=schema.params_json_schema, + on_invoke_tool=run_activity, + strict_json_schema=True, + ) + + def __getattr__(self, name): + """Delegate missing attributes to the underlying DurableOrchestrationContext.""" + try: + return getattr(self._context, name) + except AttributeError: + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") + + def __dir__(self): + """Improve introspection and tab-completion by including delegated attributes.""" + return sorted(set(dir(type(self)) + list(self.__dict__) + dir(self._context))) diff --git a/azure/durable_functions/openai_agents/event_loop.py b/azure/durable_functions/openai_agents/event_loop.py new file mode 100644 index 00000000..6b85a976 --- /dev/null +++ b/azure/durable_functions/openai_agents/event_loop.py @@ -0,0 +1,17 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import asyncio + + +def ensure_event_loop(): + """Ensure an event loop is available for sync execution context. + + This is necessary when calling Runner.run_sync from Azure Functions + Durable orchestrators, which run in a synchronous context but need + an event loop for internal async operations. + """ + try: + asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) diff --git a/azure/durable_functions/openai_agents/exceptions.py b/azure/durable_functions/openai_agents/exceptions.py new file mode 100644 index 00000000..38834a52 --- /dev/null +++ b/azure/durable_functions/openai_agents/exceptions.py @@ -0,0 +1,11 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from azure.durable_functions.models.Task import TaskBase + + +class YieldException(BaseException): + """Exception raised when an orchestrator should yield control.""" + + def __init__(self, task: TaskBase): + super().__init__("Orchestrator should yield.") + self.task = task diff --git a/azure/durable_functions/openai_agents/handoffs.py b/azure/durable_functions/openai_agents/handoffs.py new file mode 100644 index 00000000..e5140646 --- /dev/null +++ b/azure/durable_functions/openai_agents/handoffs.py @@ -0,0 +1,67 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +"""Handoff conversion utilities for Azure Durable Functions OpenAI agent operations.""" + +from typing import Any + +from agents import Handoff +from pydantic import BaseModel + + +class DurableHandoff(BaseModel): + """Serializable representation of a Handoff. + + Contains only the data needed by the model execution to + determine what to handoff to, not the actual handoff invocation. + """ + + tool_name: str + tool_description: str + input_json_schema: dict[str, Any] + agent_name: str + strict_json_schema: bool = True + + @classmethod + def from_handoff(cls, handoff: Handoff) -> "DurableHandoff": + """Create a DurableHandoff from an OpenAI agent Handoff. + + This method converts OpenAI agent Handoff instances into serializable + DurableHandoff objects for use within Azure Durable Functions. + + Parameters + ---------- + handoff : Handoff + The OpenAI agent Handoff to convert + + Returns + ------- + DurableHandoff + A serializable handoff representation + """ + return cls( + tool_name=handoff.tool_name, + tool_description=handoff.tool_description, + input_json_schema=handoff.input_json_schema, + agent_name=handoff.agent_name, + strict_json_schema=handoff.strict_json_schema, + ) + + def to_handoff(self) -> Handoff[Any, Any]: + """Create an OpenAI agent Handoff instance from this DurableHandoff. + + This method converts the serializable DurableHandoff back into an + OpenAI agent Handoff instance for execution. + + Returns + ------- + Handoff + OpenAI agent Handoff instance + """ + return Handoff( + tool_name=self.tool_name, + tool_description=self.tool_description, + input_json_schema=self.input_json_schema, + agent_name=self.agent_name, + strict_json_schema=self.strict_json_schema, + on_invoke_handoff=lambda ctx, input: None, + ) diff --git a/azure/durable_functions/openai_agents/model_invocation_activity.py b/azure/durable_functions/openai_agents/model_invocation_activity.py new file mode 100644 index 00000000..2d4d6f1d --- /dev/null +++ b/azure/durable_functions/openai_agents/model_invocation_activity.py @@ -0,0 +1,268 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import enum +import json +from typing import Any, AsyncIterator, Optional, Union, cast + +from azure.durable_functions.models.RetryOptions import RetryOptions +from pydantic import BaseModel, Field +from agents import ( + AgentOutputSchema, + AgentOutputSchemaBase, + Handoff, + Model, + ModelProvider, + ModelResponse, + ModelSettings, + ModelTracing, + OpenAIProvider, + Tool, + TResponseInputItem, + UserError, +) +from agents.items import TResponseStreamEvent +from openai.types.responses.response_prompt_param import ResponsePromptParam + +from .task_tracker import TaskTracker +from .tools import ( + DurableTool, + create_tool_from_durable_tool, + convert_tool_to_durable_tool, +) +from .handoffs import DurableHandoff + + +class DurableAgentOutputSchema(AgentOutputSchemaBase, BaseModel): + """Serializable representation of agent output schema.""" + + output_type_name: Optional[str] = None + output_schema: Optional[dict[str, Any]] = None + strict_json_schema: bool + + def is_plain_text(self) -> bool: + """Whether the output type is plain text (versus a JSON object).""" + return self.output_type_name in (None, "str") + + def name(self) -> str: + """Get the name of the output type.""" + if self.output_type_name is None: + raise ValueError("Output type name has not been specified") + return self.output_type_name + + def json_schema(self) -> dict[str, Any]: + """Return the JSON schema of the output. + + Will only be called if the output type is not plain text. + """ + if self.is_plain_text(): + raise UserError("Cannot provide JSON schema for plain text output types") + if self.output_schema is None: + raise UserError("Output schema definition is missing") + return self.output_schema + + def is_strict_json_schema(self) -> bool: + """Check if the JSON schema is in strict mode. + + Strict mode constrains the JSON schema features, but guarantees valid JSON. + See here for details: + https://platform.openai.com/docs/guides/structured-outputs#supported-schemas + """ + return self.strict_json_schema + + def validate_json(self, json_str: str) -> Any: + """Validate a JSON string against the output type. + + You must return the validated object, or raise a `ModelBehaviorError` if + the JSON is invalid. + """ + raise NotImplementedError() + + +class ModelTracingLevel(enum.IntEnum): + """Serializable IntEnum representation of ModelTracing for Azure Durable Functions. + + Values must match ModelTracing from the OpenAI SDK. This separate enum is required + because ModelTracing is a standard Enum while Pydantic serialization requires IntEnum + for proper JSON serialization in activity inputs. + """ + + DISABLED = 0 + ENABLED = 1 + ENABLED_WITHOUT_DATA = 2 + + +class DurableModelActivityInput(BaseModel): + """Serializable input for the durable model invocation activity.""" + + input: Union[str, list[TResponseInputItem]] + model_settings: ModelSettings + tracing: ModelTracingLevel + model_name: Optional[str] = None + system_instructions: Optional[str] = None + tools: list[DurableTool] = Field(default_factory=list) + output_schema: Optional[DurableAgentOutputSchema] = None + handoffs: list[DurableHandoff] = Field(default_factory=list) + previous_response_id: Optional[str] = None + prompt: Optional[Any] = None + + def to_json(self) -> str: + """Convert to a JSON string.""" + try: + return self.model_dump_json(warnings=False) + except Exception: + # Fallback to basic JSON serialization + try: + return json.dumps(self.model_dump(warnings=False), default=str) + except Exception as fallback_error: + raise ValueError( + f"Unable to serialize DurableModelActivityInput: {fallback_error}" + ) from fallback_error + + @classmethod + def from_json(cls, json_str: str) -> 'DurableModelActivityInput': + """Create from a JSON string.""" + return cls.model_validate_json(json_str) + + +class ModelInvoker: + """Handles OpenAI model invocations for Durable Functions activities.""" + + def __init__(self, model_provider: Optional[ModelProvider] = None): + """Initialize the activity with a model provider.""" + self._model_provider = model_provider or OpenAIProvider() + + async def invoke_model_activity(self, input: DurableModelActivityInput) -> ModelResponse: + """Activity that invokes a model with the given input.""" + model = self._model_provider.get_model(input.model_name) + + # Avoid https://github.com/pydantic/pydantic/issues/9541 + normalized_input = json.loads(json.dumps(input.input, default=str)) + + # Convert durable tools to agent tools + tools = [ + create_tool_from_durable_tool(durable_tool) + for durable_tool in input.tools + ] + + # Convert handoff descriptors to agent handoffs + handoffs = [ + durable_handoff.to_handoff() + for durable_handoff in input.handoffs + ] + + return await model.get_response( + system_instructions=input.system_instructions, + input=normalized_input, + model_settings=input.model_settings, + tools=tools, + output_schema=input.output_schema, + handoffs=handoffs, + tracing=ModelTracing(input.tracing), + previous_response_id=input.previous_response_id, + prompt=input.prompt, + ) + + +class DurableActivityModel(Model): + """A model implementation that uses durable activities for model invocations.""" + + def __init__( + self, + model_name: Optional[str], + task_tracker: TaskTracker, + retry_options: Optional[RetryOptions], + activity_name: str, + ) -> None: + self.model_name = model_name + self.task_tracker = task_tracker + self.retry_options = retry_options + self.activity_name = activity_name + + async def get_response( + self, + system_instructions: Optional[str], + input: Union[str, list[TResponseInputItem]], + model_settings: ModelSettings, + tools: list[Tool], + output_schema: Optional[AgentOutputSchemaBase], + handoffs: list[Handoff], + tracing: ModelTracing, + *, + previous_response_id: Optional[str], + prompt: Optional[ResponsePromptParam], + conversation_id: Optional[str] = None, + ) -> ModelResponse: + """Get a response from the model.""" + # Convert agent tools to Durable tools + durable_tools = [convert_tool_to_durable_tool(tool) for tool in tools] + + # Convert agent handoffs to Durable handoff descriptors + durable_handoffs = [DurableHandoff.from_handoff(handoff) for handoff in handoffs] + if output_schema is not None and not isinstance( + output_schema, AgentOutputSchema + ): + raise TypeError( + f"Only AgentOutputSchema is supported by Durable Model, " + f"got {type(output_schema).__name__}" + ) + + output_schema_input = ( + None + if output_schema is None + else DurableAgentOutputSchema( + output_type_name=output_schema.name(), + output_schema=( + output_schema.json_schema() + if not output_schema.is_plain_text() + else None + ), + strict_json_schema=output_schema.is_strict_json_schema(), + ) + ) + + activity_input = DurableModelActivityInput( + model_name=self.model_name, + system_instructions=system_instructions, + input=cast(Union[str, list[TResponseInputItem]], input), + model_settings=model_settings, + tools=durable_tools, + output_schema=output_schema_input, + handoffs=durable_handoffs, + tracing=ModelTracingLevel.DISABLED, # ModelTracingLevel(tracing.value), + previous_response_id=previous_response_id, + prompt=prompt, + ) + + activity_input_json = activity_input.to_json() + + if self.retry_options: + response = self.task_tracker.get_activity_call_result_with_retry( + self.activity_name, + self.retry_options, + activity_input_json, + ) + else: + response = self.task_tracker.get_activity_call_result( + self.activity_name, + activity_input_json + ) + + json_response = json.loads(response) + model_response = ModelResponse(**json_response) + return model_response + + def stream_response( + self, + system_instructions: Optional[str], + input: Union[str, list[TResponseInputItem]], + model_settings: ModelSettings, + tools: list[Tool], + output_schema: Optional[AgentOutputSchemaBase], + handoffs: list[Handoff], + tracing: ModelTracing, + *, + previous_response_id: Optional[str], + prompt: Optional[ResponsePromptParam], + ) -> AsyncIterator[TResponseStreamEvent]: + """Stream a response from the model.""" + raise NotImplementedError("Durable model doesn't support streams yet") diff --git a/azure/durable_functions/openai_agents/orchestrator_generator.py b/azure/durable_functions/openai_agents/orchestrator_generator.py new file mode 100644 index 00000000..6cc163c7 --- /dev/null +++ b/azure/durable_functions/openai_agents/orchestrator_generator.py @@ -0,0 +1,67 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from functools import partial +from typing import Optional +from agents import ModelProvider, ModelResponse +from agents.run import set_default_agent_runner +from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext +from azure.durable_functions.models.RetryOptions import RetryOptions +from .model_invocation_activity import DurableModelActivityInput, ModelInvoker +from .task_tracker import TaskTracker +from .runner import DurableOpenAIRunner +from .context import DurableAIAgentContext +from .event_loop import ensure_event_loop +from .usage_telemetry import UsageTelemetry + + +async def durable_openai_agent_activity(input: str, model_provider: ModelProvider) -> str: + """Activity logic that handles OpenAI model invocations.""" + activity_input = DurableModelActivityInput.from_json(input) + + model_invoker = ModelInvoker(model_provider=model_provider) + result = await model_invoker.invoke_model_activity(activity_input) + + # Use safe/public Pydantic API when possible. Prefer model_dump_json if result is a BaseModel + # Otherwise handle common types (str/bytes/dict/list) and fall back to json.dumps. + import json as _json + + if hasattr(result, "model_dump_json"): + # Pydantic v2 BaseModel + json_str = result.model_dump_json() + else: + if isinstance(result, bytes): + json_str = result.decode() + elif isinstance(result, str): + json_str = result + else: + # Try the internal serializer as a last resort, but fall back to json.dumps + try: + json_bytes = ModelResponse.__pydantic_serializer__.to_json(result) + json_str = json_bytes.decode() + except Exception: + json_str = _json.dumps(result) + + return json_str + + +def durable_openai_agent_orchestrator_generator( + func, + durable_orchestration_context: DurableOrchestrationContext, + model_retry_options: Optional[RetryOptions], + activity_name: str, +): + """Adapts the synchronous OpenAI Agents function to an Durable orchestrator generator.""" + # Log versions the first time this generator is invoked + UsageTelemetry.log_usage_once() + + ensure_event_loop() + task_tracker = TaskTracker(durable_orchestration_context) + durable_ai_agent_context = DurableAIAgentContext( + durable_orchestration_context, task_tracker, model_retry_options + ) + durable_openai_runner = DurableOpenAIRunner( + context=durable_ai_agent_context, activity_name=activity_name) + set_default_agent_runner(durable_openai_runner) + + func_with_context = partial(func, durable_ai_agent_context) + return task_tracker.execute_orchestrator_function(func_with_context) diff --git a/azure/durable_functions/openai_agents/runner.py b/azure/durable_functions/openai_agents/runner.py new file mode 100644 index 00000000..43981607 --- /dev/null +++ b/azure/durable_functions/openai_agents/runner.py @@ -0,0 +1,103 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +from dataclasses import replace +from typing import Any, Union + +from agents import ( + Agent, + RunConfig, + RunResult, + RunResultStreaming, + TContext, + TResponseInputItem, +) +from agents.run import DEFAULT_AGENT_RUNNER, DEFAULT_MAX_TURNS, AgentRunner +from pydantic_core import to_json + +from .context import DurableAIAgentContext +from .model_invocation_activity import DurableActivityModel + + +class DurableOpenAIRunner: + """Runner for OpenAI agents using Durable Functions orchestration.""" + + def __init__(self, context: DurableAIAgentContext, activity_name: str) -> None: + self._runner = DEFAULT_AGENT_RUNNER or AgentRunner() + self._context = context + self._activity_name = activity_name + + def _prepare_run_config( + self, + starting_agent: Agent[TContext], + input: Union[str, list[TResponseInputItem]], + **kwargs: Any, + ) -> tuple[Union[str, list[TResponseInputItem]], RunConfig, dict[str, Any]]: + """Prepare and validate the run configuration and parameters for agent execution.""" + # Avoid https://github.com/pydantic/pydantic/issues/9541 + normalized_input = json.loads(to_json(input)) + + run_config = kwargs.get("run_config") or RunConfig() + + model_name = run_config.model or starting_agent.model + if model_name and not isinstance(model_name, str): + raise ValueError( + "For agent execution in Durable Functions, model name in run_config or " + "starting_agent must be a string." + ) + + updated_run_config = replace( + run_config, + model=DurableActivityModel( + model_name=model_name, + task_tracker=self._context._task_tracker, + retry_options=self._context._model_retry_options, + activity_name=self._activity_name, + ), + ) + + run_params = { + "context": kwargs.get("context"), + "max_turns": kwargs.get("max_turns", DEFAULT_MAX_TURNS), + "hooks": kwargs.get("hooks"), + "previous_response_id": kwargs.get("previous_response_id"), + "session": kwargs.get("session"), + } + + return normalized_input, updated_run_config, run_params + + def run_sync( + self, + starting_agent: Agent[TContext], + input: Union[str, list[TResponseInputItem]], + **kwargs: Any, + ) -> RunResult: + """Run an agent synchronously with the given input and configuration.""" + normalized_input, updated_run_config, run_params = self._prepare_run_config( + starting_agent, input, **kwargs + ) + + return self._runner.run_sync( + starting_agent=starting_agent, + input=normalized_input, + run_config=updated_run_config, + **run_params, + ) + + def run( + self, + starting_agent: Agent[TContext], + input: Union[str, list[TResponseInputItem]], + **kwargs: Any, + ) -> RunResult: + """Run an agent asynchronously. Not supported in Durable Functions.""" + raise RuntimeError("Durable Functions do not support asynchronous runs.") + + def run_streamed( + self, + starting_agent: Agent[TContext], + input: Union[str, list[TResponseInputItem]], + **kwargs: Any, + ) -> RunResultStreaming: + """Run an agent with streaming. Not supported in Durable Functions.""" + raise RuntimeError("Durable Functions do not support streaming.") diff --git a/azure/durable_functions/openai_agents/task_tracker.py b/azure/durable_functions/openai_agents/task_tracker.py new file mode 100644 index 00000000..2c68cb13 --- /dev/null +++ b/azure/durable_functions/openai_agents/task_tracker.py @@ -0,0 +1,171 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import json +import inspect +from typing import Any + +from azure.durable_functions.models.DurableOrchestrationContext import ( + DurableOrchestrationContext, +) +from azure.durable_functions.models.history.HistoryEventType import HistoryEventType +from azure.durable_functions.models.RetryOptions import RetryOptions + +from .exceptions import YieldException + + +class TaskTracker: + """Tracks activity calls and handles task result processing for durable AI agents.""" + + def __init__(self, context: DurableOrchestrationContext): + self._context = context + self._activities_called = 0 + self._tasks_to_yield = [] + + def _get_activity_result_or_raise(self, task): + """Return the activity result if available; otherwise raise ``YieldException`` to defer. + + The first time an activity is scheduled its result won't yet exist in the + orchestration history, so we raise ``YieldException`` with the task so the + orchestrator can yield it. On replay, once the corresponding TASK_COMPLETED + history event is present, we capture the result and queue the task for a + later yield (to preserve ordering) while returning the deserialized value. + """ + self.record_activity_call() + + histories = self._context.histories + completed_tasks = [ + entry for entry in histories + if entry.event_type == HistoryEventType.TASK_COMPLETED + ] + if len(completed_tasks) < self._activities_called: + # Result not yet available in history -> raise to signal a yield now + raise YieldException(task) + # Result exists (replay). Queue task to be yielded after returning value. + # + # We cannot just yield it now because this method can be called from + # deeply nested code paths that we don't control (such as the + # OpenAI Agents SDK internals), and yielding here would lead to + # unintended behavior. Instead, we queue the task to be yielded + # later and return the result recorded in the history, so the + # code invoking this method can continue executing normally. + self._tasks_to_yield.append(task) + + result_json = completed_tasks[self._activities_called - 1].Result + result = json.loads(result_json) + return result + + def get_activity_call_result(self, activity_name, input: Any): + """Call an activity and return its result or raise ``YieldException`` if pending.""" + task = self._context.call_activity(activity_name, input) + return self._get_activity_result_or_raise(task) + + def get_activity_call_result_with_retry( + self, activity_name, retry_options: RetryOptions, input: Any + ): + """Call an activity with retry and return its result or raise YieldException if pending.""" + task = self._context.call_activity_with_retry(activity_name, retry_options, input) + return self._get_activity_result_or_raise(task) + + def record_activity_call(self): + """Record that an activity was called.""" + self._activities_called += 1 + + def _yield_and_clear_tasks(self): + """Yield all accumulated tasks and clear the tasks list.""" + for task in self._tasks_to_yield: + yield task + self._tasks_to_yield.clear() + + def execute_orchestrator_function(self, func): + """Execute the orchestrator function with comprehensive task and exception handling. + + The orchestrator function can exhibit any combination of the following behaviors: + - Execute regular code and return a value or raise an exception + - Invoke get_activity_call_result or get_activity_call_result_with_retry, which leads to + either interrupting the orchestrator function immediately (because of YieldException), + or queueing the task for later yielding while continuing execution + - Invoke DurableAIAgentContext.call_activity or call_activity_with_retry (which must lead + to corresponding record_activity_call invocations) + - Yield tasks (typically produced by DurableAIAgentContext methods like call_activity, + wait_for_external_event, etc.), which may or may not interrupt orchestrator function + execution + - Mix all of the above in any combination + + This method converts both YieldException and regular yields into a sequence of yields + preserving the order, while also capturing return values through the generator protocol. + For example, if the orchestrator function yields task A, then queues task B for yielding, + then raises YieldException wrapping task C, this method makes sure that the resulting + sequence of yields is: (A, B, C). + + Args + ---- + func: The orchestrator function to execute (generator or regular function) + + Yields + ------ + Tasks yielded by the orchestrator function and tasks wrapped in YieldException + + Returns + ------- + The return value from the orchestrator function + """ + if inspect.isgeneratorfunction(func): + gen = iter(func()) + try: + # prime the subiterator + value = next(gen) + yield from self._yield_and_clear_tasks() + while True: + try: + # send whatever was sent into us down to the subgenerator + yield from self._yield_and_clear_tasks() + sent = yield value + except GeneratorExit: + # ensure the subgenerator is closed + if hasattr(gen, "close"): + gen.close() + raise + except BaseException as exc: + # forward thrown exceptions if possible + if hasattr(gen, "throw"): + value = gen.throw(type(exc), exc, exc.__traceback__) + else: + raise + else: + # normal path: forward .send (or .__next__) + if hasattr(gen, "send"): + value = gen.send(sent) + else: + value = next(gen) + except StopIteration as e: + yield from self._yield_and_clear_tasks() + return TaskTracker._durable_serializer(e.value) + except YieldException as e: + yield from self._yield_and_clear_tasks() + yield e.task + else: + try: + result = func() + return TaskTracker._durable_serializer(result) + except YieldException as e: + yield from self._yield_and_clear_tasks() + yield e.task + finally: + yield from self._yield_and_clear_tasks() + + @staticmethod + def _durable_serializer(obj: Any) -> str: + # Strings are already "serialized" + if type(obj) is str: + return obj + + # Serialize "Durable" and OpenAI models, and typed dictionaries + if callable(getattr(obj, "to_json", None)): + return obj.to_json() + + # Serialize Pydantic models + if callable(getattr(obj, "model_dump_json", None)): + return obj.model_dump_json() + + # Fallback to default JSON serialization + return json.dumps(obj) diff --git a/azure/durable_functions/openai_agents/tools.py b/azure/durable_functions/openai_agents/tools.py new file mode 100644 index 00000000..1ff6b543 --- /dev/null +++ b/azure/durable_functions/openai_agents/tools.py @@ -0,0 +1,148 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +"""Tool conversion utilities for Azure Durable Functions OpenAI agent operations.""" + +from typing import Any, Union + +from agents import ( + CodeInterpreterTool, + FileSearchTool, + FunctionTool, + HostedMCPTool, + ImageGenerationTool, + Tool, + UserError, + WebSearchTool, +) +from openai.types.responses.tool_param import Mcp +from pydantic import BaseModel + + +# Built-in tool types that can be serialized directly without conversion +BUILT_IN_TOOL_TYPES = ( + FileSearchTool, + WebSearchTool, + ImageGenerationTool, + CodeInterpreterTool, +) + + +class DurableFunctionTool(BaseModel): + """Serializable representation of a FunctionTool. + + Contains only the data needed by the model execution to + determine what tool to call, not the actual tool invocation. + """ + + name: str + description: str + params_json_schema: dict[str, Any] + strict_json_schema: bool = True + + +class DurableMCPToolConfig(BaseModel): + """Serializable representation of a HostedMCPTool. + + Contains only the data needed by the model execution to + determine what tool to call, not the actual tool invocation. + """ + + tool_config: Mcp + + +DurableTool = Union[ + DurableFunctionTool, + FileSearchTool, + WebSearchTool, + ImageGenerationTool, + CodeInterpreterTool, + DurableMCPToolConfig, +] + + +def create_tool_from_durable_tool( + durable_tool: DurableTool, +) -> Tool: + """Convert a DurableTool to an OpenAI agent Tool for execution. + + This function transforms Durable Functions tool definitions into actual + OpenAI agent Tool instances that can be used during model execution. + + Parameters + ---------- + durable_tool : DurableTool + The Durable tool definition to convert + + Returns + ------- + Tool + An OpenAI agent Tool instance ready for execution + + Raises + ------ + UserError + If the tool type is not supported + """ + # Built-in tools that don't need conversion + if isinstance(durable_tool, BUILT_IN_TOOL_TYPES): + return durable_tool + + # Convert Durable MCP tool configuration to HostedMCPTool + if isinstance(durable_tool, DurableMCPToolConfig): + return HostedMCPTool( + tool_config=durable_tool.tool_config, + ) + + # Convert Durable function tool to FunctionTool + if isinstance(durable_tool, DurableFunctionTool): + return FunctionTool( + name=durable_tool.name, + description=durable_tool.description, + params_json_schema=durable_tool.params_json_schema, + on_invoke_tool=lambda ctx, input: "", + strict_json_schema=durable_tool.strict_json_schema, + ) + + raise UserError(f"Unsupported tool type: {durable_tool}") + + +def convert_tool_to_durable_tool(tool: Tool) -> DurableTool: + """Convert an OpenAI agent Tool to a DurableTool for serialization. + + This function transforms OpenAI agent Tool instances into Durable Functions + tool definitions that can be serialized and passed to activities. + + Parameters + ---------- + tool : Tool + The OpenAI agent Tool to convert + + Returns + ------- + DurableTool + A serializable tool definition + + Raises + ------ + ValueError + If the tool type is not supported for conversion + """ + # Built-in tools that can be serialized directly + if isinstance(tool, BUILT_IN_TOOL_TYPES): + return tool + + # Convert HostedMCPTool to Durable MCP configuration + elif isinstance(tool, HostedMCPTool): + return DurableMCPToolConfig(tool_config=tool.tool_config) + + # Convert FunctionTool to Durable function tool + elif isinstance(tool, FunctionTool): + return DurableFunctionTool( + name=tool.name, + description=tool.description, + params_json_schema=tool.params_json_schema, + strict_json_schema=tool.strict_json_schema, + ) + + else: + raise ValueError(f"Unsupported tool type for Durable Functions: {type(tool).__name__}") diff --git a/azure/durable_functions/openai_agents/usage_telemetry.py b/azure/durable_functions/openai_agents/usage_telemetry.py new file mode 100644 index 00000000..3ae824ea --- /dev/null +++ b/azure/durable_functions/openai_agents/usage_telemetry.py @@ -0,0 +1,69 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +class UsageTelemetry: + """Handles telemetry logging for OpenAI Agents SDK integration usage.""" + + # Class-level flag to ensure logging happens only once across all instances + _usage_logged = False + + @classmethod + def log_usage_once(cls): + """Log OpenAI Agents SDK integration usage exactly once. + + Fails gracefully if metadata cannot be retrieved. + """ + if cls._usage_logged: + return + + # NOTE: Any log line beginning with the special prefix defined below will be + # captured by the Azure Functions host as a Language Worker console log and + # forwarded to internal telemetry pipelines. + # Do not change this constant value without coordinating with the Functions + # host team. + LANGUAGE_WORKER_CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog" + + package_versions = cls._collect_openai_agent_package_versions() + msg = ( + f"{LANGUAGE_WORKER_CONSOLE_LOG_PREFIX}" # Prefix captured by Azure Functions host + "Detected OpenAI Agents SDK integration with Durable Functions. " + f"Package versions: {package_versions}" + ) + print(msg) + + cls._usage_logged = True + + @classmethod + def _collect_openai_agent_package_versions(cls) -> str: + """Collect versions of relevant packages for telemetry logging. + + Returns + ------- + str + Comma-separated list of name=version entries or "(unavailable)" if + versions could not be determined. + """ + try: + try: + from importlib import metadata # Python 3.8+ + except ImportError: # pragma: no cover - legacy fallback + import importlib_metadata as metadata # type: ignore + + package_names = [ + "azure-functions-durable", + "openai", + "openai-agents", + ] + + versions = [] + for package_name in package_names: + try: + ver = metadata.version(package_name) + versions.append(f"{package_name}={ver}") + except Exception: # noqa: BLE001 - swallow and continue + versions.append(f"{package_name}=(not installed)") + + return ", ".join(versions) if versions else "(unavailable)" + except Exception: # noqa: BLE001 - never let version gathering break user code + return "(unavailable)" diff --git a/docs/openai_agents/README.md b/docs/openai_agents/README.md new file mode 100644 index 00000000..09c831b4 --- /dev/null +++ b/docs/openai_agents/README.md @@ -0,0 +1,20 @@ +# Durable OpenAI Agents + +Build production-ready AI agents with automatic state persistence and failure recovery. + +## Overview + +The Durable OpenAI Agents integration combines the familiar OpenAI Agents SDK with Azure Durable Functions to create reliable, stateful AI agents that can survive any failure and continue exactly where they stopped. + +## Key Benefits + +- **Enhanced Agent Resilience**: Built-in retry mechanisms for LLM calls and tool executions +- **Multi-Agent Orchestration Reliability**: Individual agent failures don't crash entire workflows +- **Built-in Observability**: Monitor agent progress through the Durable Task Scheduler dashboard +- **Familiar Developer Experience**: Keep using the OpenAI Agents SDK with minimal code changes +- **Distributed Compute and Scalability**: Agent workflows automatically scale across multiple compute instances + +## Documentation + +- [Getting Started](getting-started.md) - Setup and your first durable agent +- [Reference](reference.md) - Complete reference documentation \ No newline at end of file diff --git a/docs/openai_agents/getting-started.md b/docs/openai_agents/getting-started.md new file mode 100644 index 00000000..e68a27fc --- /dev/null +++ b/docs/openai_agents/getting-started.md @@ -0,0 +1,191 @@ +# Getting Started with Durable OpenAI Agents + +Getting started guide for implementing stateful AI agents using Azure Durable Functions orchestration with automatic checkpointing and replay semantics. + +## Prerequisites + +- Python 3.10+ runtime environment +- Azure Functions Core Tools v4.x (`npm install -g azure-functions-core-tools@4 --unsafe-perm true`) +- Azure OpenAI service endpoint with model deployment +- Docker (Optional for the Durable Task Scheduler Emulator) + +## Environment Setup + +### Create an Azure Functions App + +This framework is designed specifically for **Azure Functions applications**. You need to create a Python Functions app to use Durable OpenAI Agents. + +**For new users**: If you're new to Azure Functions, follow these guides to get started: +- [Create your first Python function in Azure](https://learn.microsoft.com/en-us/azure/azure-functions/create-first-function-vs-code-python) +- [Azure Functions Python developer guide](https://learn.microsoft.com/en-us/azure/azure-functions/functions-reference-python) + +**For experienced Functions users**: Create a new Python Functions app or use an existing one. + +**Note**: The `samples-v2/openai_agents` directory contains a complete working example you can reference or use as a starting point. + +### Set Up Local Development Environment + +Create and activate a virtual environment to isolate dependencies: + +```bash +# Create virtual environment +python -m venv venv + +# Activate virtual environment +# On macOS/Linux: +source venv/bin/activate +# On Windows: +# venv\Scripts\activate +``` + +### Install Dependencies + +Add the OpenAI Agents dependencies to your `requirements.txt`: + +``` +azure-functions-durable +azure-functions +openai +openai-agents +azure-identity +``` + +Then install them: + +```bash +pip install -r requirements.txt +``` + +### Configuring Durable Task Scheduler Backend + +**Durable Task Scheduler is the preferred backend** for this integration as it provides enhanced performance, better observability, and simplified local development. While not a hard requirement, it's strongly recommended for production workloads. + +There are two ways to configure the backend locally: + +#### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. **Pull the Docker Image for the Emulator:** +```bash +docker pull mcr.microsoft.com/dts/dts-emulator:latest +``` + +2. **Run the Emulator:** +```bash +docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest +``` + +3. **Wait for container readiness** (approximately 10-15 seconds) + +4. **Verify emulator status:** +```bash +curl http://localhost:8080/health +``` + +**Note**: The sample code automatically uses the default emulator settings (`endpoint: http://localhost:8080`, `taskhub: default`). No additional environment variables are required. + +#### Alternative: Azure Storage Backend + +If you prefer using Azure Storage as the backend (legacy approach): + +```bash +# Uses local storage emulator - requires Azurite +npm install -g azurite +azurite --silent --location /tmp/azurite --debug /tmp/azurite/debug.log +``` + +Update `local.settings.json`: +```json +{ + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true" + } +} +``` + +## Configuration + +1. **Install project dependencies:** + +```bash +pip install -r requirements.txt +``` + +2. **Configure service settings:** + +Update `local.settings.json` with your service configuration: + +```json +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "python", + "AZURE_OPENAI_ENDPOINT": "https://.openai.azure.com/", + "AZURE_OPENAI_DEPLOYMENT": "", + "AZURE_OPENAI_API_VERSION": "2024-10-01-preview", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "http://localhost:8080;Authentication=None;", + "TASKHUB": "default" + } +} +``` + +## Hello World Example + +Execute the included hello world sample. + +```python +# basic/hello_world.py - Standard OpenAI Agent +from agents import Agent, Runner + +def main(): + agent = Agent( + name="Assistant", + instructions="You only respond in haikus.", + ) + result = Runner.run_sync(agent, "Tell me about recursion in programming.") + return result.final_output +``` + +**Durable Transformation**: The `@app.durable_openai_agent_orchestrator` decorator in `function_app.py` wraps this agent execution within a Durable Functions orchestrator, providing agent state persisted at each LLM and tool interaction. + +## Execution and Monitoring + +1. **Start the Azure Functions host:** + +Navigate to the `samples-v2/openai_agents` directory and run: + +```bash +func start --port 7071 +``` + +2. **Initiate orchestration instance:** + +```bash +curl -X POST http://localhost:7071/api/orchestrators/hello_world \ + -H "Content-Type: application/json" +``` + +Response contains orchestration instance metadata: + +```json +{ + "id": "f4b2c8d1e9a7...", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/f4b2c8d1e9a7...", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/f4b2c8d1e9a7.../raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/f4b2c8d1e9a7.../terminate", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/f4b2c8d1e9a7..." +} +``` + +3. **Monitor execution via Durable Task Scheduler dashboard:** + +Navigate to `http://localhost:8082` for real-time orchestration monitoring: +- Instance execution timeline with LLM call latencies +- State transition logs and checkpoint data +- Retry attempt tracking and failure analysis + +## Next Steps + +- Reference [Reference Documentation](reference.md) for complete technical details. \ No newline at end of file diff --git a/docs/openai_agents/reference.md b/docs/openai_agents/reference.md new file mode 100644 index 00000000..720cdd3d --- /dev/null +++ b/docs/openai_agents/reference.md @@ -0,0 +1,138 @@ +# Reference Documentation + +Complete reference for Durable OpenAI Agents integration. + +## Durable Orchestration + +### @app.durable_openai_agent_orchestrator + +Primary decorator enabling durable execution for agent invocations. + +```python +from azure.durable_functions.openai_agents import durable_openai_agent_orchestrator + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def my_agent_orchestrator(context): + # Agent implementation + pass +``` + +**Features**: +- Automatic state persistence for agent conversations +- Built-in retry mechanisms for LLM calls +- Tool call durability and replay protection +- Integration with Durable Functions monitoring using the Durable Task Scheduler + +**Constraints**: +- Functions must be deterministic (identical outputs for identical inputs) +- No non-deterministic operations: `datetime.now()`, `random`, `uuid.uuid4()` +- See [Durable Functions Code Constraints](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp) + +### @app.orchestration_trigger + +Azure Functions orchestration trigger decorator. Required with `@app.durable_openai_agent_orchestrator`. + +```python +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def my_orchestrator(context): + # ... +``` + +## Agent Execution + +### Runner.run_sync() + +Runner for agents in durable orchestration context. + +```python +from agents import Agent, Runner + +def my_orchestrator(context): + agent = Agent(name="Assistant", instructions="Be helpful") + result = Runner.run_sync(agent, "Hello world") + return result.final_output +``` + +**Parameters**: +- `agent` (Agent): Agent instance to run +- `messages` (str | list): Input message(s) + +**Returns**: Agent result object with `final_output` property + +## Tools + +### Durable Functions Activity Tools + +Durable Function Activities that execute as durable tool invocations. **This is the recommended approach for most use cases** as it provides the strongest correctness guarantees. - **When in doubt - this is the safe choice** + +```python +# 1. Define activity function +@app.activity_trigger(input_name="input_param") +async def my_activity(input_param): + # External API calls, database operations, etc. + return result + +# 2. Use in orchestrator +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def my_orchestrator(context): + agent = Agent( + tools=[context.create_activity_tool(my_activity)] + ) + # ... +``` + +**Components**: +- `@app.activity_trigger(input_name="param")`: Decorator for activity functions +- `context.create_activity_tool(activity_function)`: Creates tool from activity function + +**Best For**: External API calls, database operations, file I/O, expensive computations, non-deterministic operations + +### Open AI Function Tools + +Simple, deterministic tools that execute within the orchestration context. **Recommended only as a performance optimization when you're certain the tool meets all deterministic requirements.** + +```python +from agents import function_tool + +@function_tool +def calculate(expression: str) -> str: + """Calculate mathematical expressions.""" + return str(eval(expression)) +``` + +**Requirements**: +- Must be deterministic (same input → same output) +- Should be fast-executing +- No external API calls (use activity tools instead) +- Input/output must be JSON serializable + +**Best For**: Calculations, data transformations, validation logic, quick lookups + +### Current Limitations + +**MCP (Model Context Protocol)**: MCP tool support is not currently available. Use function tools or activity tools instead. + +## Constraints + +Orchestration functions must be deterministic and replay-safe: + +- **Deterministic**: Same input always produces same output +- **Idempotent**: Safe to execute multiple times +- **Side-effect free**: No external calls in orchestration logic + +```python +# ✅ Good: Deterministic +def good_orchestrator(context): + input_data = context.get_input() + agent = high_priority_agent if input_data.get("priority") == "high" else standard_agent + return Runner.run_sync(agent, input_data["content"]) + +# ❌ Bad: Non-deterministic +def bad_orchestrator(context): + import random + agent = agent_a if random.choice([True, False]) else agent_b # Non-deterministic! + return Runner.run_sync(agent, context.get_input()) +``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a853b88a..42f4630d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ flake8-docstrings==1.5.0 pytest==7.1.2 python-dateutil==2.8.0 requests==2.32.4 -jsonschema==3.2.0 +jsonschema==4.25.1 aiohttp==3.12.14 azure-functions>=1.11.3b3 nox==2019.11.9 @@ -12,4 +12,7 @@ pytest-asyncio==0.20.2 autopep8 types-python-dateutil opentelemetry-api==1.32.1 -opentelemetry-sdk==1.32.1 \ No newline at end of file +opentelemetry-sdk==1.32.1 +openai==1.98.0 +openai-agents==0.2.4 +eval_type_backport diff --git a/samples-v2/openai_agents/.funcignore b/samples-v2/openai_agents/.funcignore new file mode 100644 index 00000000..9966315f --- /dev/null +++ b/samples-v2/openai_agents/.funcignore @@ -0,0 +1,8 @@ +.git* +.vscode +__azurite_db*__.json +__blobstorage__ +__queuestorage__ +local.settings.json +test +.venv \ No newline at end of file diff --git a/samples-v2/openai_agents/.gitignore b/samples-v2/openai_agents/.gitignore new file mode 100644 index 00000000..0a959b3c --- /dev/null +++ b/samples-v2/openai_agents/.gitignore @@ -0,0 +1,128 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don’t work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Virtual Environment (additional patterns) +.myenv/ +.venv/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE and Editor Configuration +.code/ +.vscode/ +.idea/ + +# Test Reports and Results +*TEST_REPORT*.md +*test_results*.json +comprehensive_test_results.json +COMPREHENSIVE_TEST_REPORT.md +TEST_VALIDATION_REPORT.md + +# Azure Functions artifacts +bin +obj +appsettings.json +local.settings.json + +# Azurite artifacts +__blobstorage__ +__queuestorage__ +__azurite_db*__.json +.python_packages \ No newline at end of file diff --git a/samples-v2/openai_agents/README.md b/samples-v2/openai_agents/README.md new file mode 100644 index 00000000..acd92501 --- /dev/null +++ b/samples-v2/openai_agents/README.md @@ -0,0 +1,44 @@ +# OpenAI Agents with Azure Durable Functions - Samples + +This directory contains sample code demonstrating how to use OpenAI agents with Azure Durable Functions for reliable, stateful AI workflows. + +## 📖 Documentation + +**Complete documentation is located at: [/docs/openai_agents/](/docs/openai_agents/)** + +### Quick Links + +- **[Getting Started Guide](/docs/openai_agents/getting-started.md)** - Setup and basic usage +- **[API Reference](/docs/openai_agents/reference.md)** - Complete technical reference +- **[Overview](/docs/openai_agents/README.md)** - Feature overview and concepts + +## 🚀 Quick Start + +1. **Setup**: Follow the [Getting Started Guide](/docs/openai_agents/getting-started.md) +2. **Run Samples**: Explore the `/basic` directory for examples +3. **Reference**: Check [API Reference](/docs/openai_agents/reference.md) for advanced usage + +## 📂 Sample Structure + +``` +basic/ # Basic usage examples +├── hello_world.py # Simplest agent example +├── tools.py # Function and activity tools +├── dynamic_system_prompt.py # Dynamic prompt handling +├── lifecycle_example.py # Agent lifecycle management +└── ... # Additional examples +``` + +## 🔧 Running Samples + +```bash +# Install dependencies +pip install -r requirements.txt + +# Start the Azure Functions runtime +func start + +# Test with HTTP requests (see documentation for details) +``` + +**For complete setup instructions, configuration details, and troubleshooting, see the [Getting Started Guide](/docs/openai_agents/getting-started.md).** diff --git a/samples-v2/openai_agents/__init__.py b/samples-v2/openai_agents/__init__.py new file mode 100644 index 00000000..e333a2e3 --- /dev/null +++ b/samples-v2/openai_agents/__init__.py @@ -0,0 +1,3 @@ +# Make the examples directory into a package to avoid top-level module name collisions. +# This is needed so that mypy treats files like examples/customer_service/main.py and +# examples/researcher_app/main.py as distinct modules rather than both named "main". diff --git a/samples-v2/openai_agents/basic/agent_lifecycle_example.py b/samples-v2/openai_agents/basic/agent_lifecycle_example.py new file mode 100644 index 00000000..84fa09c9 --- /dev/null +++ b/samples-v2/openai_agents/basic/agent_lifecycle_example.py @@ -0,0 +1,97 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import random +from typing import Any + +from pydantic import BaseModel + +from agents import Agent, AgentHooks, RunContextWrapper, Runner, Tool, function_tool + + +class CustomAgentHooks(AgentHooks): + def __init__(self, display_name: str): + self.event_counter = 0 + self.display_name = display_name + + async def on_start(self, context: RunContextWrapper, agent: Agent) -> None: + self.event_counter += 1 + print(f"### ({self.display_name}) {self.event_counter}: Agent {agent.name} started") + + async def on_end(self, context: RunContextWrapper, agent: Agent, output: Any) -> None: + self.event_counter += 1 + print( + f"### ({self.display_name}) {self.event_counter}: Agent {agent.name} ended with output {output}" + ) + + async def on_handoff(self, context: RunContextWrapper, agent: Agent, source: Agent) -> None: + self.event_counter += 1 + print( + f"### ({self.display_name}) {self.event_counter}: Agent {source.name} handed off to {agent.name}" + ) + + async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: Tool) -> None: + self.event_counter += 1 + print( + f"### ({self.display_name}) {self.event_counter}: Agent {agent.name} started tool {tool.name}" + ) + + async def on_tool_end( + self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str + ) -> None: + self.event_counter += 1 + print( + f"### ({self.display_name}) {self.event_counter}: Agent {agent.name} ended tool {tool.name} with result {result}" + ) + + +### + + +@function_tool +def random_number(max: int) -> int: + """ + Generate a random number from 0 to max (inclusive). + """ + return random.randint(0, max) + + +@function_tool +def multiply_by_two(x: int) -> int: + """Simple multiplication by two.""" + return x * 2 + + +class FinalResult(BaseModel): + number: int + + +multiply_agent = Agent( + name="Multiply Agent", + instructions="Multiply the number by 2 and then return the final result.", + tools=[multiply_by_two], + output_type=FinalResult, + hooks=CustomAgentHooks(display_name="Multiply Agent"), +) + +start_agent = Agent( + name="Start Agent", + instructions="Generate a random number. If it's even, stop. If it's odd, hand off to the multiply agent.", + tools=[random_number], + output_type=FinalResult, + handoffs=[multiply_agent], + hooks=CustomAgentHooks(display_name="Start Agent"), +) + + +def main(): + # Default max number for demo + max_number = 250 + print(f"Generating random number between 0 and {max_number}") + + result = Runner.run_sync( + start_agent, + input=f"Generate a random number between 0 and {max_number}." + ) + + print("Done!") + return result.final_output diff --git a/samples-v2/openai_agents/basic/dynamic_system_prompt.py b/samples-v2/openai_agents/basic/dynamic_system_prompt.py new file mode 100644 index 00000000..4a1e8e9b --- /dev/null +++ b/samples-v2/openai_agents/basic/dynamic_system_prompt.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import random +from typing import Literal + +from agents import Agent, RunContextWrapper, Runner + + +class CustomContext: + def __init__(self, style: Literal["haiku", "pirate", "robot"]): + self.style = style + + +def custom_instructions( + run_context: RunContextWrapper[CustomContext], agent: Agent[CustomContext] +) -> str: + context = run_context.context + if context.style == "haiku": + return "Only respond in haikus." + elif context.style == "pirate": + return "Respond as a pirate." + else: + return "Respond as a robot and say 'beep boop' a lot." + + +agent = Agent( + name="Chat agent", + instructions=custom_instructions, +) + + +def main(): + choice: Literal["haiku", "pirate", "robot"] = random.choice(["haiku", "pirate", "robot"]) + context = CustomContext(style=choice) + print(f"Using style: {choice}\n") + + user_message = "Tell me a joke." + print(f"User: {user_message}") + result = Runner.run_sync(agent, user_message, context=context) + + print(f"Assistant: {result.final_output}") + return result.final_output diff --git a/samples-v2/openai_agents/basic/hello_world.py b/samples-v2/openai_agents/basic/hello_world.py new file mode 100644 index 00000000..57c4ca98 --- /dev/null +++ b/samples-v2/openai_agents/basic/hello_world.py @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from agents import Agent, Runner + + +def main(): + agent = Agent( + name="Assistant", + instructions="You only respond in haikus.", + ) + + result = Runner.run_sync(agent, "Tell me about recursion in programming.") + return result.final_output; + # Function calls itself, + # Looping in smaller pieces, + # Endless by design. diff --git a/samples-v2/openai_agents/basic/lifecycle_example.py b/samples-v2/openai_agents/basic/lifecycle_example.py new file mode 100644 index 00000000..0e9d4973 --- /dev/null +++ b/samples-v2/openai_agents/basic/lifecycle_example.py @@ -0,0 +1,119 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import random +from typing import Any, Optional + +from pydantic import BaseModel + +from agents import Agent, RunContextWrapper, RunHooks, Runner, Tool, Usage, function_tool +from agents.items import ModelResponse, TResponseInputItem + + +class ExampleHooks(RunHooks): + def __init__(self): + self.event_counter = 0 + + def _usage_to_str(self, usage: Usage) -> str: + return f"{usage.requests} requests, {usage.input_tokens} input tokens, {usage.output_tokens} output tokens, {usage.total_tokens} total tokens" + + async def on_agent_start(self, context: RunContextWrapper, agent: Agent) -> None: + self.event_counter += 1 + print( + f"### {self.event_counter}: Agent {agent.name} started. Usage: {self._usage_to_str(context.usage)}" + ) + + async def on_llm_start( + self, + context: RunContextWrapper, + agent: Agent, + system_prompt: Optional[str], + input_items: list[TResponseInputItem], + ) -> None: + self.event_counter += 1 + print(f"### {self.event_counter}: LLM started. Usage: {self._usage_to_str(context.usage)}") + + async def on_llm_end( + self, context: RunContextWrapper, agent: Agent, response: ModelResponse + ) -> None: + self.event_counter += 1 + print(f"### {self.event_counter}: LLM ended. Usage: {self._usage_to_str(context.usage)}") + + async def on_agent_end(self, context: RunContextWrapper, agent: Agent, output: Any) -> None: + self.event_counter += 1 + print( + f"### {self.event_counter}: Agent {agent.name} ended with output {output}. Usage: {self._usage_to_str(context.usage)}" + ) + + async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: Tool) -> None: + self.event_counter += 1 + print( + f"### {self.event_counter}: Tool {tool.name} started. Usage: {self._usage_to_str(context.usage)}" + ) + + async def on_tool_end( + self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str + ) -> None: + self.event_counter += 1 + print( + f"### {self.event_counter}: Tool {tool.name} ended with result {result}. Usage: {self._usage_to_str(context.usage)}" + ) + + async def on_handoff( + self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent + ) -> None: + self.event_counter += 1 + print( + f"### {self.event_counter}: Handoff from {from_agent.name} to {to_agent.name}. Usage: {self._usage_to_str(context.usage)}" + ) + + +hooks = ExampleHooks() + +### + + +@function_tool +def random_number(max: int) -> int: + """Generate a random number from 0 to max (inclusive).""" + return random.randint(0, max) + + +@function_tool +def multiply_by_two(x: int) -> int: + """Return x times two.""" + return x * 2 + + +class FinalResult(BaseModel): + number: int + + +multiply_agent = Agent( + name="Multiply Agent", + instructions="Multiply the number by 2 and then return the final result.", + tools=[multiply_by_two], + output_type=FinalResult, +) + +start_agent = Agent( + name="Start Agent", + instructions="Generate a random number. If it's even, stop. If it's odd, hand off to the multiplier agent.", + tools=[random_number], + output_type=FinalResult, + handoffs=[multiply_agent], +) + + +def main(): + # Default max number for demo + max_number = 250 + print(f"Enter a max number: {max_number}") + + result = Runner.run_sync( + start_agent, + input=f"Generate a random number between 0 and {max_number}.", + hooks=hooks, + ) + + print("Done!") + return result.final_output diff --git a/samples-v2/openai_agents/basic/local_image.py b/samples-v2/openai_agents/basic/local_image.py new file mode 100644 index 00000000..53d29380 --- /dev/null +++ b/samples-v2/openai_agents/basic/local_image.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from agents import Agent, Runner + + +def main(): + # Note: In a real implementation, you would handle image upload/attachment + # This simplified version demonstrates the pattern + agent = Agent( + name="Image Assistant", + instructions="You are a helpful assistant that can analyze images.", + ) + + # Simulated image analysis for the demo + message = "I have uploaded a local image. Please describe what you see in it." + + # Note: In a real scenario, you would include the actual image data + # For this demo, we'll simulate the response + result = Runner.run_sync(agent, message) + print(result.final_output) + return result.final_output diff --git a/samples-v2/openai_agents/basic/non_strict_output_type.py b/samples-v2/openai_agents/basic/non_strict_output_type.py new file mode 100644 index 00000000..e8271a40 --- /dev/null +++ b/samples-v2/openai_agents/basic/non_strict_output_type.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from pydantic import BaseModel +from typing import Optional + +from agents import Agent, Runner + + +class WeatherInfo(BaseModel): + city: str + temperature: Optional[str] = None + conditions: Optional[str] = None + humidity: Optional[str] = None + + +def main(): + # Using non-strict mode allows the model to return partial or flexible output + agent = Agent( + name="Weather Assistant", + instructions="Provide weather information for the requested city. Return as much detail as available.", + output_type=WeatherInfo, + # Note: In real implementation, you might set strict=False for more flexible output + ) + + result = Runner.run_sync(agent, "What's the weather like in Tokyo?") + print(result.final_output) + return result.final_output diff --git a/samples-v2/openai_agents/basic/previous_response_id.py b/samples-v2/openai_agents/basic/previous_response_id.py new file mode 100644 index 00000000..b525234d --- /dev/null +++ b/samples-v2/openai_agents/basic/previous_response_id.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from agents import Agent, Runner + + +def main(): + agent = Agent( + name="Memory Assistant", + instructions="You are a helpful assistant with memory of previous conversations.", + ) + + # First conversation + print("First interaction:") + result1 = Runner.run_sync(agent, "My name is John and I like pizza.") + print(f"Assistant: {result1.final_output}") + + # Note: In a real implementation, you would use the previous_response_id + # to maintain conversation context across multiple runs + print("\nSecond interaction (remembering previous context):") + result2 = Runner.run_sync(agent, "What did I tell you about my food preferences?") + print(f"Assistant: {result2.final_output}") + + return result2.final_output diff --git a/samples-v2/openai_agents/basic/remote_image.py b/samples-v2/openai_agents/basic/remote_image.py new file mode 100644 index 00000000..bbb3eec2 --- /dev/null +++ b/samples-v2/openai_agents/basic/remote_image.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from agents import Agent, Runner + + +def main(): + agent = Agent( + name="Remote Image Assistant", + instructions="You are a helpful assistant that can analyze images from URLs.", + ) + + # Example with a hypothetical remote image URL + image_url = "https://example.com/sample-image.jpg" + message = f"Please analyze this image from the URL: {image_url}" + + # Note: In a real implementation, you would handle the remote image URL + # and include it in the message or as an attachment + result = Runner.run_sync(agent, message) + print(result.final_output) + return result.final_output diff --git a/samples-v2/openai_agents/basic/tools.py b/samples-v2/openai_agents/basic/tools.py new file mode 100644 index 00000000..c8937589 --- /dev/null +++ b/samples-v2/openai_agents/basic/tools.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from pydantic import BaseModel + +from agents import Agent, Runner, function_tool + + +class Weather(BaseModel): + city: str + temperature_range: str + conditions: str + + +@function_tool +def get_weather(city: str) -> Weather: + """Get the current weather information for a specified city.""" + print("[debug] get_weather called") + return Weather(city=city, temperature_range="14-20C", conditions="Sunny with wind.") + + +agent = Agent( + name="Hello world", + instructions="You are a helpful agent.", + tools=[get_weather], +) + + +def main(): + result = Runner.run_sync(agent, input="What's the weather in Tokyo?") + print(result.final_output) + return result.final_output diff --git a/samples-v2/openai_agents/function_app.py b/samples-v2/openai_agents/function_app.py new file mode 100644 index 00000000..68805de7 --- /dev/null +++ b/samples-v2/openai_agents/function_app.py @@ -0,0 +1,117 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import os +import random + +import azure.functions as func +import azure.durable_functions as df + +from azure.identity import DefaultAzureCredential +from openai import AsyncAzureOpenAI + +from agents import set_default_openai_client + + +#region Regular Azure OpenAI setup + +# Initialize Azure credential +credential = DefaultAzureCredential() + +# Token provider function that returns the token +def get_azure_token(): + return credential.get_token("https://cognitiveservices.azure.com/.default").token + +# Initialize Azure OpenAI client with DefaultAzureCredential +openai_client = AsyncAzureOpenAI( + azure_ad_token_provider=get_azure_token, + api_version=os.getenv("AZURE_OPENAI_API_VERSION", "2025-03-01-preview"), + azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), + azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4") +) + +# Set the default OpenAI client for the Agents SDK +set_default_openai_client(openai_client) + +# endregion + + +app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION) + +@app.route(route="orchestrators/{functionName}") +@app.durable_client_input(client_name="client") +async def orchestration_starter(req: func.HttpRequest, client): + function_name = req.route_params.get('functionName') + # Starting a new orchestration instance in the most regular way + instance_id = await client.start_new(function_name) + response = client.create_check_status_response(req, instance_id) + return response + + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def hello_world(context): + import basic.hello_world + return basic.hello_world.main() + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def agent_lifecycle_example(context): + import basic.agent_lifecycle_example + return basic.agent_lifecycle_example.main() + + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def dynamic_system_prompt(context): + import basic.dynamic_system_prompt + return basic.dynamic_system_prompt.main() + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def lifecycle_example(context): + import basic.lifecycle_example + return basic.lifecycle_example.main() + + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def local_image(context): + import basic.local_image + return basic.local_image.main() + + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def non_strict_output_type(context): + import basic.non_strict_output_type + return basic.non_strict_output_type.main() + + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def previous_response_id(context): + import basic.previous_response_id + return basic.previous_response_id.main() + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def remote_image(context): + import basic.remote_image + return basic.remote_image.main() + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def tools(context): + import basic.tools + return basic.tools.main() + +@app.activity_trigger(input_name="max") +async def random_number_tool(max: int) -> int: + """Return a random integer between 0 and the given maximum.""" + return random.randint(0, max) + +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator +def message_filter(context): + import handoffs.message_filter + return handoffs.message_filter.main(context.create_activity_tool(random_number_tool)) diff --git a/samples-v2/openai_agents/handoffs/message_filter.py b/samples-v2/openai_agents/handoffs/message_filter.py new file mode 100644 index 00000000..53bada31 --- /dev/null +++ b/samples-v2/openai_agents/handoffs/message_filter.py @@ -0,0 +1,175 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from __future__ import annotations + +import json + +from agents import Agent, HandoffInputData, Runner, function_tool, handoff +from agents.extensions import handoff_filters +from agents.models import is_gpt_5_default + + +def spanish_handoff_message_filter(handoff_message_data: HandoffInputData) -> HandoffInputData: + if is_gpt_5_default(): + print("gpt-5 is enabled, so we're not filtering the input history") + # when using gpt-5, removing some of the items could break things, so we do this filtering only for other models + return HandoffInputData( + input_history=handoff_message_data.input_history, + pre_handoff_items=tuple(handoff_message_data.pre_handoff_items), + new_items=tuple(handoff_message_data.new_items), + ) + + # First, we'll remove any tool-related messages from the message history + handoff_message_data = handoff_filters.remove_all_tools(handoff_message_data) + + # Second, we'll also remove the first two items from the history, just for demonstration + history = ( + tuple(handoff_message_data.input_history[2:]) + if isinstance(handoff_message_data.input_history, tuple) + else handoff_message_data.input_history + ) + + # or, you can use the HandoffInputData.clone(kwargs) method + return HandoffInputData( + input_history=history, + pre_handoff_items=tuple(handoff_message_data.pre_handoff_items), + new_items=tuple(handoff_message_data.new_items), + ) + + +def main(random_number_tool): + first_agent = Agent( + name="Assistant", + instructions="Be extremely concise.", + tools=[random_number_tool], + ) + + spanish_agent = Agent( + name="Spanish Assistant", + instructions="You only speak Spanish and are extremely concise.", + handoff_description="A Spanish-speaking assistant.", + ) + + second_agent = Agent( + name="Assistant", + instructions=( + "Be a helpful assistant. If the user speaks Spanish, handoff to the Spanish assistant." + ), + handoffs=[handoff(spanish_agent, input_filter=spanish_handoff_message_filter)], + ) + + # 1. Send a regular message to the first agent + result = Runner.run_sync(first_agent, input="Hi, my name is Sora.") + + print("Step 1 done") + + # 2. Ask it to generate a number + result = Runner.run_sync( + first_agent, + input=result.to_input_list() + + [{"content": "Can you generate a random number between 0 and 100?", "role": "user"}], + ) + + print("Step 2 done") + + # 3. Call the second agent + result = Runner.run_sync( + second_agent, + input=result.to_input_list() + + [ + { + "content": "I live in New York City. Whats the population of the city?", + "role": "user", + } + ], + ) + + print("Step 3 done") + + # 4. Cause a handoff to occur + result = Runner.run_sync( + second_agent, + input=result.to_input_list() + + [ + { + "content": "Por favor habla en español. ¿Cuál es mi nombre y dónde vivo?", + "role": "user", + } + ], + ) + + print("Step 4 done") + + print("\n===Final messages===\n") + + # 5. That should have caused spanish_handoff_message_filter to be called, which means the + # output should be missing the first two messages, and have no tool calls. + # Let's print the messages to see what happened + for message in result.to_input_list(): + print(json.dumps(message, indent=2)) + # tool_calls = message.tool_calls if isinstance(message, AssistantMessage) else None + + # print(f"{message.role}: {message.content}\n - Tool calls: {tool_calls or 'None'}") + """ + $python examples/handoffs/message_filter.py + Step 1 done + Step 2 done + Step 3 done + Step 4 done + + ===Final messages=== + + { + "content": "Can you generate a random number between 0 and 100?", + "role": "user" + } + { + "id": "...", + "content": [ + { + "annotations": [], + "text": "Sure! Here's a random number between 0 and 100: **42**.", + "type": "output_text" + } + ], + "role": "assistant", + "status": "completed", + "type": "message" + } + { + "content": "I live in New York City. Whats the population of the city?", + "role": "user" + } + { + "id": "...", + "content": [ + { + "annotations": [], + "text": "As of the most recent estimates, the population of New York City is approximately 8.6 million people. However, this number is constantly changing due to various factors such as migration and birth rates. For the latest and most accurate information, it's always a good idea to check the official data from sources like the U.S. Census Bureau.", + "type": "output_text" + } + ], + "role": "assistant", + "status": "completed", + "type": "message" + } + { + "content": "Por favor habla en espa\u00f1ol. \u00bfCu\u00e1l es mi nombre y d\u00f3nde vivo?", + "role": "user" + } + { + "id": "...", + "content": [ + { + "annotations": [], + "text": "No tengo acceso a esa informaci\u00f3n personal, solo s\u00e9 lo que me has contado: vives en Nueva York.", + "type": "output_text" + } + ], + "role": "assistant", + "status": "completed", + "type": "message" + } + """ + + return result.final_output diff --git a/samples-v2/openai_agents/host.json b/samples-v2/openai_agents/host.json new file mode 100644 index 00000000..069e8f88 --- /dev/null +++ b/samples-v2/openai_agents/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.29.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples-v2/openai_agents/local.settings.json.template b/samples-v2/openai_agents/local.settings.json.template new file mode 100644 index 00000000..71126517 --- /dev/null +++ b/samples-v2/openai_agents/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "python", + "AZURE_OPENAI_ENDPOINT": "https://your-openai-service.openai.azure.com/", + "AZURE_OPENAI_DEPLOYMENT": "your-gpt-deployment-name", + "AZURE_OPENAI_API_VERSION": "2025-03-01-preview" + } +} \ No newline at end of file diff --git a/samples-v2/openai_agents/requirements.txt b/samples-v2/openai_agents/requirements.txt new file mode 100644 index 00000000..89ebebe2 --- /dev/null +++ b/samples-v2/openai_agents/requirements.txt @@ -0,0 +1,10 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azure-functions-durable +azure-identity +openai==1.107.3 +openai-agents==0.3.0 +pydantic diff --git a/samples-v2/openai_agents/test_orchestrators.py b/samples-v2/openai_agents/test_orchestrators.py new file mode 100755 index 00000000..63bdf223 --- /dev/null +++ b/samples-v2/openai_agents/test_orchestrators.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +""" +Test script for OpenAI Agents with Durable Functions Extension +This script tests all orchestrators as specified in the instructions document. +""" + +import requests +import json +import time +import argparse +import os +from typing import Dict, List, Tuple, Optional +import re + +# List of orchestrators to test based on the instructions +ORCHESTRATORS = [ + "agent_lifecycle_example", + "dynamic_system_prompt", + "hello_world", + "lifecycle_example", + "local_image", + "non_strict_output_type", + "previous_response_id", + "remote_image", + "tools", + "message_filter", +] + +BASE_URL = "http://localhost:7071/api/orchestrators" +TIMEOUT_SECONDS = 60 # Maximum time to wait for orchestration completion +POLL_INTERVAL = 2 # Seconds between status checks + +def extract_status_url(orchestration_response: str) -> Optional[str]: + """ + Extract the status query URL from orchestration response + """ + try: + response_data = json.loads(orchestration_response) + return response_data.get("statusQueryGetUri") + except: + return None + +def get_orchestration_status(status_url: str) -> Tuple[str, Optional[str], Optional[str]]: + """ + Get the current status of an orchestration + Returns: (runtime_status, output, error_details) + """ + try: + response = requests.get(status_url, timeout=10) + if response.status_code in [200, 202]: # Both 200 and 202 are valid responses + status_data = json.loads(response.text) + runtime_status = status_data.get("runtimeStatus", "Unknown") + output = status_data.get("output") + return runtime_status, output, None + else: + return "Error", None, f"HTTP {response.status_code}: {response.text}" + except Exception as e: + return "Error", None, f"Status check failed: {str(e)}" + +def wait_for_completion(status_url: str, orchestrator_name: str) -> Tuple[str, Optional[str], Optional[str]]: + """ + Wait for orchestration to complete and return final status + Returns: (final_status, output, error_details) + """ + print(f" ⏳ Waiting for {orchestrator_name} to complete...") + + start_time = time.time() + while time.time() - start_time < TIMEOUT_SECONDS: + status, output, error = get_orchestration_status(status_url) + + print(f" 📊 Status: {status}") + + # Terminal states + if status in ["Completed", "Failed", "Terminated", "Canceled"]: + return status, output, error + + # Continue waiting for non-terminal states + if status in ["Running", "Pending"]: + time.sleep(POLL_INTERVAL) + continue + + # Unknown status - might be an error + if status == "Error": + return status, output, error + + # Any other status, keep waiting + time.sleep(POLL_INTERVAL) + + # Timeout reached + return "Timeout", None, f"Orchestration did not complete within {TIMEOUT_SECONDS} seconds" + +def test_orchestrator_full(orchestrator_name: str) -> Dict: + """ + Test a single orchestrator end-to-end including completion + Returns: detailed test result dictionary + """ + print(f"\n🧪 Testing {orchestrator_name}...") + result = { + "name": orchestrator_name, + "startup_success": False, + "startup_response": None, + "startup_error": None, + "status_url": None, + "instance_id": None, + "final_status": None, + "output": None, + "execution_error": None, + "execution_time": None + } + + try: + # Step 1: Start orchestration + print(f" 🚀 Starting orchestration...") + url = f"{BASE_URL}/{orchestrator_name}" + start_time = time.time() + + response = requests.post(url, timeout=30) + + if response.status_code in [200, 202]: + result["startup_success"] = True + result["startup_response"] = response.text + + # Extract instance ID and status URL + try: + response_data = json.loads(response.text) + result["instance_id"] = response_data.get("id") + result["status_url"] = response_data.get("statusQueryGetUri") + print(f" ✅ Started successfully (Instance: {result['instance_id']})") + except: + print(f" ⚠️ Started but couldn't parse response") + + else: + result["startup_error"] = f"HTTP {response.status_code}: {response.text}" + print(f" ❌ Startup failed: {result['startup_error']}") + return result + + except Exception as e: + result["startup_error"] = f"Request failed: {str(e)}" + print(f" ❌ Startup failed: {result['startup_error']}") + return result + + # Step 2: Wait for completion if we have a status URL + if result["status_url"]: + try: + final_status, output, error = wait_for_completion(result["status_url"], orchestrator_name) + result["final_status"] = final_status + result["output"] = output + result["execution_error"] = error + result["execution_time"] = time.time() - start_time + + if final_status == "Completed": + print(f" ✅ Completed successfully in {result['execution_time']:.1f}s") + if output: + print(f" 📝 Output: {str(output)[:100]}{'...' if len(str(output)) > 100 else ''}") + elif final_status == "Failed": + print(f" ❌ Failed after {result['execution_time']:.1f}s") + if error: + # Extract key error information + error_summary = str(error)[:200] + "..." if len(str(error)) > 200 else str(error) + print(f" 🔍 Error: {error_summary}") + else: + print(f" ⚠️ Ended with status: {final_status}") + + except Exception as e: + result["execution_error"] = f"Status monitoring failed: {str(e)}" + print(f" ❌ Status monitoring failed: {result['execution_error']}") + else: + print(f" ⚠️ No status URL available for monitoring") + + return result + +def run_all_tests() -> Dict: + """ + Run comprehensive tests for all orchestrators and return results + """ + print("🧪 Starting OpenAI Agents with Durable Functions Extension - Comprehensive Test Suite") + print("=" * 80) + + results = { + "test_results": [], + "summary": {} + } + + for i, orchestrator in enumerate(ORCHESTRATORS, 1): + print(f"\n[{i}/{len(ORCHESTRATORS)}] " + "="*60) + test_result = test_orchestrator_full(orchestrator) + results["test_results"].append(test_result) + + # Small delay between tests to avoid overwhelming the system + if i < len(ORCHESTRATORS): + print(f" ⏸️ Waiting {POLL_INTERVAL}s before next test...") + time.sleep(POLL_INTERVAL) + + # Calculate summary statistics + total = len(ORCHESTRATORS) + startup_successful = sum(1 for r in results["test_results"] if r["startup_success"]) + execution_completed = sum(1 for r in results["test_results"] if r["final_status"] == "Completed") + execution_failed = sum(1 for r in results["test_results"] if r["final_status"] == "Failed") + execution_timeout = sum(1 for r in results["test_results"] if r["final_status"] == "Timeout") + execution_other = sum(1 for r in results["test_results"] if r["final_status"] and r["final_status"] not in ["Completed", "Failed", "Timeout"]) + + results["summary"] = { + "total_tests": total, + "startup_successful": startup_successful, + "execution_completed": execution_completed, + "execution_failed": execution_failed, + "execution_timeout": execution_timeout, + "execution_other": execution_other, + "startup_success_rate": f"{(startup_successful/total)*100:.1f}%", + "execution_success_rate": f"{(execution_completed/total)*100:.1f}%" if total > 0 else "0%" + } + + return results + +def print_report(results: Dict): + """ + Print comprehensive test report + """ + print("\n" + "=" * 80) + print("📊 COMPREHENSIVE TEST VALIDATION REPORT") + print("=" * 80) + + # Summary + summary = results["summary"] + print(f"\n📈 SUMMARY STATISTICS:") + print(f" Total Tests: {summary['total_tests']}") + print(f" Startup Successful: {summary['startup_successful']}/{summary['total_tests']} ({summary['startup_success_rate']})") + print(f" Execution Completed: {summary['execution_completed']}/{summary['total_tests']} ({summary['execution_success_rate']})") + print(f" Execution Failed: {summary['execution_failed']}") + print(f" Execution Timeout: {summary['execution_timeout']}") + print(f" Execution Other: {summary['execution_other']}") + + # Detailed results by category + test_results = results["test_results"] + + # Startup successful tests + startup_successful = [r for r in test_results if r["startup_success"]] + if startup_successful: + print(f"\n✅ STARTUP SUCCESSFUL ({len(startup_successful)}):") + for test in startup_successful: + print(f" • {test['name']} (Instance: {test['instance_id'] or 'N/A'})") + + # Execution completed tests + execution_completed = [r for r in test_results if r["final_status"] == "Completed"] + if execution_completed: + print(f"\n🎉 EXECUTION COMPLETED ({len(execution_completed)}):") + for test in execution_completed: + exec_time = f" in {test['execution_time']:.1f}s" if test['execution_time'] else "" + print(f" • {test['name']}{exec_time}") + if test['output']: + output_preview = str(test['output'])[:100] + "..." if len(str(test['output'])) > 100 else str(test['output']) + print(f" Output: {output_preview}") + + # Execution failed tests + execution_failed = [r for r in test_results if r["final_status"] == "Failed"] + if execution_failed: + print(f"\n❌ EXECUTION FAILED ({len(execution_failed)}):") + for test in execution_failed: + exec_time = f" after {test['execution_time']:.1f}s" if test['execution_time'] else "" + print(f" • {test['name']}{exec_time}") + if test['execution_error']: + # Extract key error information + error_lines = str(test['execution_error']).split('\\n') + key_error = next((line for line in error_lines if 'RuntimeError:' in line or 'Exception:' in line), + str(test['execution_error'])[:150]) + print(f" Error: {key_error}") + + # Startup failed tests + startup_failed = [r for r in test_results if not r["startup_success"]] + if startup_failed: + print(f"\n🚫 STARTUP FAILED ({len(startup_failed)}):") + for test in startup_failed: + print(f" • {test['name']}") + print(f" Error: {test['startup_error']}") + + # Timeout tests + timeout_tests = [r for r in test_results if r["final_status"] == "Timeout"] + if timeout_tests: + print(f"\n⏰ EXECUTION TIMEOUT ({len(timeout_tests)}):") + for test in timeout_tests: + print(f" • {test['name']} (exceeded {TIMEOUT_SECONDS}s)") + + # Recommendations based on results + print(f"\n💡 ANALYSIS & RECOMMENDATIONS:") + + if summary['execution_completed'] == summary['total_tests']: + print(" 🎉 EXCELLENT: All orchestrators completed successfully!") + print(" • Integration is working correctly") + print(" • Ready for production use") + + elif summary['startup_successful'] == summary['total_tests'] and summary['execution_failed'] > 0: + print(" ⚠️ INFRASTRUCTURE OK, RUNTIME ISSUES DETECTED:") + print(" • Azure Functions integration is working correctly") + print(" • Orchestrators start successfully but fail during execution") + + # Analyze common error patterns + common_errors = {} + for test in execution_failed: + if test['execution_error']: + error_str = str(test['execution_error']) + if 'event loop' in error_str.lower(): + common_errors['AsyncIO Event Loop'] = common_errors.get('AsyncIO Event Loop', 0) + 1 + elif 'timeout' in error_str.lower(): + common_errors['Timeout'] = common_errors.get('Timeout', 0) + 1 + elif 'openai' in error_str.lower(): + common_errors['OpenAI API'] = common_errors.get('OpenAI API', 0) + 1 + else: + common_errors['Other'] = common_errors.get('Other', 0) + 1 + + if common_errors: + print(" • Common error patterns detected:") + for error_type, count in common_errors.items(): + print(f" - {error_type}: {count} occurrences") + + if 'AsyncIO Event Loop' in common_errors: + print(" • SOLUTION: Implement event loop fix in sample code") + print(" - See TEST_VALIDATION_REPORT.md for specific solutions") + + elif summary['startup_successful'] < summary['total_tests']: + print(" 🚨 INFRASTRUCTURE ISSUES DETECTED:") + print(" • Some orchestrators failed to start") + print(" • Check Azure Functions configuration") + print(" • Verify environment variables and dependencies") + + else: + print(" 🔍 MIXED RESULTS - Review individual test details above") + + print("\n" + "=" * 80) + +if __name__ == "__main__": + # Parse command line arguments + parser = argparse.ArgumentParser(description="Test OpenAI Agents with Durable Functions Extension") + parser.add_argument( + "--output", "-o", + default="comprehensive_test_results.json", + help="Output file path for test results (default: comprehensive_test_results.json)" + ) + args = parser.parse_args() + + # Check if Functions runtime is available + try: + response = requests.get("http://localhost:7071", timeout=5) + print("✅ Azure Functions runtime is running") + except: + print("❌ Azure Functions runtime is not accessible at http://localhost:7071") + print("Please ensure 'func start' is running in the project directory") + exit(1) + + # Run comprehensive tests + results = run_all_tests() + + # Print detailed report + print_report(results) + + # Save results to file + output_file = args.output + with open(output_file, "w") as f: + json.dump(results, f, indent=2, default=str) + + print(f"\n💾 Detailed results saved to: {os.path.basename(output_file)}") # Exit with appropriate code + if results["summary"]["execution_completed"] == results["summary"]["total_tests"]: + print("🎉 All tests completed successfully!") + exit(0) + elif results["summary"]["startup_successful"] == results["summary"]["total_tests"]: + print("⚠️ All orchestrators started but some failed during execution") + exit(1) + else: + print("🚨 Some orchestrators failed to start") + exit(2) diff --git a/setup.py b/setup.py index ab7a2965..206ae2ce 100644 --- a/setup.py +++ b/setup.py @@ -68,7 +68,7 @@ def run(self, *args, **kwargs): 'pytest==7.1.2', 'python-dateutil==2.8.0', 'requests==2.22.0', - 'jsonschema==3.2.0', + 'jsonschema==4.25.1', 'aiohttp==3.6.2', 'azure-functions>=1.2.0', 'nox==2019.11.9', diff --git a/tests/openai_agents/__init__.py b/tests/openai_agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/openai_agents/test_context.py b/tests/openai_agents/test_context.py new file mode 100644 index 00000000..155426d7 --- /dev/null +++ b/tests/openai_agents/test_context.py @@ -0,0 +1,466 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import pytest +from unittest.mock import Mock, patch + +from azure.durable_functions.openai_agents.context import DurableAIAgentContext +from azure.durable_functions.openai_agents.task_tracker import TaskTracker +from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext +from azure.durable_functions.models.RetryOptions import RetryOptions + +from agents.tool import FunctionTool + + +class TestDurableAIAgentContext: + """Test suite for DurableAIAgentContext class.""" + + def _create_mock_orchestration_context(self): + """Create a mock DurableOrchestrationContext for testing.""" + orchestration_context = Mock(spec=DurableOrchestrationContext) + orchestration_context.call_activity = Mock(return_value="mock_task") + orchestration_context.call_activity_with_retry = Mock(return_value="mock_task_with_retry") + orchestration_context.instance_id = "test_instance_id" + orchestration_context.current_utc_datetime = "2023-01-01T00:00:00Z" + orchestration_context.is_replaying = False + return orchestration_context + + def _create_mock_task_tracker(self): + """Create a mock TaskTracker for testing.""" + task_tracker = Mock(spec=TaskTracker) + task_tracker.record_activity_call = Mock() + task_tracker.get_activity_call_result = Mock(return_value="activity_result") + task_tracker.get_activity_call_result_with_retry = Mock(return_value="retry_activity_result") + return task_tracker + + def _create_mock_activity_func(self, name="test_activity", input_name=None, + activity_name=None): + """Create a mock activity function with configurable parameters.""" + mock_activity_func = Mock() + mock_activity_func._function._name = name + mock_activity_func._function._func = lambda x: x + + if input_name is not None: + # Create trigger with input_name + mock_activity_func._function._trigger = Mock() + mock_activity_func._function._trigger.activity = activity_name + mock_activity_func._function._trigger.name = input_name + else: + # No trigger means no input_name + mock_activity_func._function._trigger = None + + return mock_activity_func + + def _setup_activity_tool_mocks(self, mock_function_tool, mock_function_schema, + activity_name="test_activity", description=""): + """Setup common mocks for function_schema and FunctionTool.""" + mock_schema = Mock() + mock_schema.name = activity_name + mock_schema.description = description + mock_schema.params_json_schema = {"type": "object"} + mock_function_schema.return_value = mock_schema + + mock_tool = Mock(spec=FunctionTool) + mock_function_tool.return_value = mock_tool + + return mock_tool + + def _invoke_activity_tool(self, run_activity, input_data): + """Helper to invoke the activity tool with asyncio.""" + mock_ctx = Mock() + import asyncio + return asyncio.run(run_activity(mock_ctx, input_data)) + + def _test_activity_tool_input_processing(self, input_name=None, input_data="", + expected_input_parameter_value="", + retry_options=None, + activity_name="test_activity"): + """Framework method that runs a complete input processing test.""" + with patch('azure.durable_functions.openai_agents.context.function_schema') \ + as mock_function_schema, \ + patch('azure.durable_functions.openai_agents.context.FunctionTool') \ + as mock_function_tool: + + # Setup + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + mock_activity_func = self._create_mock_activity_func( + name=activity_name, input_name=input_name) + self._setup_activity_tool_mocks( + mock_function_tool, mock_function_schema, activity_name) + + # Create context and tool + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + ai_context.create_activity_tool(mock_activity_func, retry_options=retry_options) + + # Get and invoke the run_activity function + call_args = mock_function_tool.call_args + run_activity = call_args[1]['on_invoke_tool'] + self._invoke_activity_tool(run_activity, input_data) + + # Verify the expected call was made + if retry_options: + task_tracker.get_activity_call_result_with_retry.assert_called_once_with( + activity_name, retry_options, expected_input_parameter_value + ) + else: + task_tracker.get_activity_call_result.assert_called_once_with( + activity_name, expected_input_parameter_value + ) + + def test_init_creates_context_successfully(self): + """Test that __init__ creates a DurableAIAgentContext successfully.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + retry_options = RetryOptions(1000, 3) + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, retry_options) + + assert isinstance(ai_context, DurableAIAgentContext) + assert not isinstance(ai_context, DurableOrchestrationContext) + + def test_call_activity_delegates_and_records(self): + """Test that call_activity delegates to context and records activity call.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + result = ai_context.call_activity("test_activity", "test_input") + + orchestration_context.call_activity.assert_called_once_with("test_activity", "test_input") + task_tracker.record_activity_call.assert_called_once() + assert result == "mock_task" + + def test_call_activity_with_retry_delegates_and_records(self): + """Test that call_activity_with_retry delegates to context and records activity call.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + retry_options = RetryOptions(1000, 3) + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + result = ai_context.call_activity_with_retry("test_activity", retry_options, "test_input") + + orchestration_context.call_activity_with_retry.assert_called_once_with( + "test_activity", retry_options, "test_input" + ) + task_tracker.record_activity_call.assert_called_once() + assert result == "mock_task_with_retry" + + @patch('azure.durable_functions.openai_agents.context.function_schema') + @patch('azure.durable_functions.openai_agents.context.FunctionTool') + def test_activity_as_tool_creates_function_tool(self, mock_function_tool, mock_function_schema): + """Test that create_activity_tool creates a FunctionTool with correct parameters.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + # Mock the activity function + mock_activity_func = Mock() + mock_activity_func._function._name = "test_activity" + mock_activity_func._function._func = lambda x: x + + # Mock the schema + mock_schema = Mock() + mock_schema.name = "test_activity" + mock_schema.description = "Test activity description" + mock_schema.params_json_schema = {"type": "object"} + mock_function_schema.return_value = mock_schema + + # Mock FunctionTool + mock_tool = Mock(spec=FunctionTool) + mock_function_tool.return_value = mock_tool + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + retry_options = RetryOptions(1000, 3) + + result = ai_context.create_activity_tool( + mock_activity_func, + description="Custom description", + retry_options=retry_options + ) + + # Verify function_schema was called correctly + mock_function_schema.assert_called_once_with( + func=mock_activity_func._function._func, + docstring_style=None, + description_override="Custom description", + use_docstring_info=True, + strict_json_schema=True, + ) + + # Verify FunctionTool was created correctly + mock_function_tool.assert_called_once() + call_args = mock_function_tool.call_args + assert call_args[1]['name'] == "test_activity" + assert call_args[1]['description'] == "Test activity description" + assert call_args[1]['params_json_schema'] == {"type": "object"} + assert call_args[1]['strict_json_schema'] is True + assert callable(call_args[1]['on_invoke_tool']) + + assert result is mock_tool + + @patch('azure.durable_functions.openai_agents.context.function_schema') + @patch('azure.durable_functions.openai_agents.context.FunctionTool') + def test_activity_as_tool_with_default_retry_options(self, mock_function_tool, mock_function_schema): + """Test that create_activity_tool uses default retry options when none provided.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + mock_activity_func = Mock() + mock_activity_func._function._name = "test_activity" + mock_activity_func._function._func = lambda x: x + + mock_schema = Mock() + mock_schema.name = "test_activity" + mock_schema.description = "Test description" + mock_schema.params_json_schema = {"type": "object"} + mock_function_schema.return_value = mock_schema + + mock_tool = Mock(spec=FunctionTool) + mock_function_tool.return_value = mock_tool + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + + # Call with default retry options + result = ai_context.create_activity_tool(mock_activity_func) + + # Should still create the tool successfully + assert result is mock_tool + mock_function_tool.assert_called_once() + + @patch('azure.durable_functions.openai_agents.context.function_schema') + @patch('azure.durable_functions.openai_agents.context.FunctionTool') + def test_activity_as_tool_run_activity_with_retry(self, mock_function_tool, mock_function_schema): + """Test that the run_activity function calls task tracker with retry options.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + mock_activity_func = Mock() + mock_activity_func._function._name = "test_activity" + mock_activity_func._function._trigger = None + mock_activity_func._function._func = lambda x: x + + mock_schema = Mock() + mock_schema.name = "test_activity" + mock_schema.description = "" + mock_schema.params_json_schema = {"type": "object"} + mock_function_schema.return_value = mock_schema + + mock_tool = Mock(spec=FunctionTool) + mock_function_tool.return_value = mock_tool + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + retry_options = RetryOptions(1000, 3) + + ai_context.create_activity_tool(mock_activity_func, retry_options=retry_options) + + # Get the run_activity function that was passed to FunctionTool + call_args = mock_function_tool.call_args + run_activity = call_args[1]['on_invoke_tool'] + + # Create a mock context wrapper + mock_ctx = Mock() + + # Call the run_activity function + import asyncio + result = asyncio.run(run_activity(mock_ctx, "test_input")) + + # Verify the task tracker was called with retry options + task_tracker.get_activity_call_result_with_retry.assert_called_once_with( + "test_activity", retry_options, "test_input" + ) + assert result == "retry_activity_result" + + @patch('azure.durable_functions.openai_agents.context.function_schema') + @patch('azure.durable_functions.openai_agents.context.FunctionTool') + def test_activity_as_tool_run_activity_without_retry(self, mock_function_tool, mock_function_schema): + """Test that the run_activity function calls task tracker without retry when retry_options is None.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + mock_activity_func = Mock() + mock_activity_func._function._name = "test_activity" + mock_activity_func._function._trigger = None + mock_activity_func._function._func = lambda x: x + + mock_schema = Mock() + mock_schema.name = "test_activity" + mock_schema.description = "" + mock_schema.params_json_schema = {"type": "object"} + mock_function_schema.return_value = mock_schema + + mock_tool = Mock(spec=FunctionTool) + mock_function_tool.return_value = mock_tool + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + + ai_context.create_activity_tool(mock_activity_func, retry_options=None) + + # Get the run_activity function that was passed to FunctionTool + call_args = mock_function_tool.call_args + run_activity = call_args[1]['on_invoke_tool'] + + # Create a mock context wrapper + mock_ctx = Mock() + + # Call the run_activity function + import asyncio + result = asyncio.run(run_activity(mock_ctx, "test_input")) + + # Verify the task tracker was called without retry options + task_tracker.get_activity_call_result.assert_called_once_with( + "test_activity", "test_input" + ) + assert result == "activity_result" + + @patch('azure.durable_functions.openai_agents.context.function_schema') + @patch('azure.durable_functions.openai_agents.context.FunctionTool') + def test_activity_as_tool_extracts_activity_name_from_trigger(self, mock_function_tool, mock_function_schema): + """Test that the run_activity function calls task tracker with the activity name specified in the trigger.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + mock_activity_func = Mock() + mock_activity_func._function._name = "test_activity" + mock_activity_func._function._trigger.activity = "activity_name_from_trigger" + mock_activity_func._function._func = lambda x: x + + mock_schema = Mock() + mock_schema.name = "test_activity" + mock_schema.description = "" + mock_schema.params_json_schema = {"type": "object"} + mock_function_schema.return_value = mock_schema + + mock_tool = Mock(spec=FunctionTool) + mock_function_tool.return_value = mock_tool + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + + ai_context.create_activity_tool(mock_activity_func, retry_options=None) + + # Get the run_activity function that was passed to FunctionTool + call_args = mock_function_tool.call_args + run_activity = call_args[1]['on_invoke_tool'] + + # Create a mock context wrapper + mock_ctx = Mock() + + # Call the run_activity function + import asyncio + result = asyncio.run(run_activity(mock_ctx, "test_input")) + + # Verify the task tracker was called without retry options + task_tracker.get_activity_call_result.assert_called_once_with( + "activity_name_from_trigger", "test_input" + ) + assert result == "activity_result" + + def test_create_activity_tool_parses_json_input_with_input_name(self): + """Test JSON input parsing and named value extraction with input_name.""" + self._test_activity_tool_input_processing( + input_name="max", + input_data='{"max": 100}', + expected_input_parameter_value=100, + activity_name="random_number_tool" + ) + + def test_create_activity_tool_handles_non_json_input_gracefully(self): + """Test non-JSON input passes through unchanged with input_name.""" + self._test_activity_tool_input_processing( + input_name="param", + input_data="not json", + expected_input_parameter_value="not json" + ) + + def test_create_activity_tool_handles_json_missing_named_parameter(self): + """Test JSON input without named parameter passes through unchanged.""" + json_input = '{"other_param": 200}' + self._test_activity_tool_input_processing( + input_name="expected_param", + input_data=json_input, + expected_input_parameter_value=json_input + ) + + def test_create_activity_tool_handles_malformed_json_gracefully(self): + """Test malformed JSON passes through unchanged.""" + malformed_json = '{"param": 100' # Missing closing brace + self._test_activity_tool_input_processing( + input_name="param", + input_data=malformed_json, + expected_input_parameter_value=malformed_json + ) + + def test_create_activity_tool_json_parsing_works_with_retry_options(self): + """Test JSON parsing works correctly with retry options.""" + retry_options = RetryOptions(1000, 3) + self._test_activity_tool_input_processing( + input_name="value", + input_data='{"value": "test_data"}', + expected_input_parameter_value="test_data", + retry_options=retry_options + ) + + def test_create_activity_tool_no_input_name_passes_through_json(self): + """Test JSON input passes through unchanged when no input_name.""" + json_input = '{"param": 100}' + self._test_activity_tool_input_processing( + input_name=None, # No input_name + input_data=json_input, + expected_input_parameter_value=json_input + ) + + def test_context_delegation_methods_work(self): + """Test that common context methods work through delegation.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + # Add some mock methods to the orchestration context + orchestration_context.wait_for_external_event = Mock(return_value="external_event_task") + orchestration_context.create_timer = Mock(return_value="timer_task") + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + + # These should work through delegation + result1 = ai_context.wait_for_external_event("test_event") + result2 = ai_context.create_timer("2023-01-01T00:00:00Z") + + assert result1 == "external_event_task" + assert result2 == "timer_task" + orchestration_context.wait_for_external_event.assert_called_once_with("test_event") + orchestration_context.create_timer.assert_called_once_with("2023-01-01T00:00:00Z") + + def test_getattr_delegates_to_context(self): + """Test that __getattr__ delegates attribute access to the underlying context.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + + # Test delegation of various attributes + assert ai_context.instance_id == "test_instance_id" + assert ai_context.current_utc_datetime == "2023-01-01T00:00:00Z" + assert ai_context.is_replaying is False + + def test_getattr_raises_attribute_error_for_nonexistent_attributes(self): + """Test that __getattr__ raises AttributeError for non-existent attributes.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + + with pytest.raises(AttributeError, match="'DurableAIAgentContext' object has no attribute 'nonexistent_attr'"): + _ = ai_context.nonexistent_attr + + def test_dir_includes_delegated_attributes(self): + """Test that __dir__ includes attributes from the underlying context.""" + orchestration_context = self._create_mock_orchestration_context() + task_tracker = self._create_mock_task_tracker() + + ai_context = DurableAIAgentContext(orchestration_context, task_tracker, None) + dir_result = dir(ai_context) + + # Should include delegated attributes from the underlying context + assert 'instance_id' in dir_result + assert 'current_utc_datetime' in dir_result + assert 'is_replaying' in dir_result + # Should also include public methods + assert 'call_activity' in dir_result + assert 'create_activity_tool' in dir_result diff --git a/tests/openai_agents/test_task_tracker.py b/tests/openai_agents/test_task_tracker.py new file mode 100644 index 00000000..92ba93ba --- /dev/null +++ b/tests/openai_agents/test_task_tracker.py @@ -0,0 +1,290 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import pytest +import json +from unittest.mock import Mock + +from azure.durable_functions.openai_agents.task_tracker import TaskTracker +from azure.durable_functions.openai_agents.exceptions import YieldException +from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext +from azure.durable_functions.models.history.HistoryEvent import HistoryEvent +from azure.durable_functions.models.history.HistoryEventType import HistoryEventType +from azure.durable_functions.models.RetryOptions import RetryOptions + + +class MockTask: + """Mock Task object for testing.""" + + def __init__(self, activity_name: str, input_data: str): + self.activity_name = activity_name + self.input = input_data + self.id = f"task_{activity_name}" + + +def create_mock_context(task_completed_results=None): + """Create a mock DurableOrchestrationContext with configurable history. + + Args: + ---- + task_completed_results: List of objects to be serialized as JSON results. + Each object will be json.dumps() serialized automatically. + """ + context = Mock(spec=DurableOrchestrationContext) + + # Create history events for completed tasks + histories = [] + if task_completed_results: + for i, result_object in enumerate(task_completed_results): + history_event = Mock(spec=HistoryEvent) + history_event.event_type = HistoryEventType.TASK_COMPLETED + history_event.Result = json.dumps(result_object) + histories.append(history_event) + + context.histories = histories + + # Mock call_activity method + def mock_call_activity(activity_name, input_data): + return MockTask(activity_name, input_data) + + context.call_activity = Mock(side_effect=mock_call_activity) + + # Mock call_activity_with_retry method + def mock_call_activity_with_retry(activity_name, retry_options, input_data): + return MockTask(activity_name, input_data) + + context.call_activity_with_retry = Mock(side_effect=mock_call_activity_with_retry) + + return context + + +class TestTaskTracker: + """Tests for the TaskTracker implementation.""" + + def _consume_generator_with_return_value(self, generator): + """Consume a generator and capture both yielded items and return value. + + Returns + ------- + tuple + (yielded_items, return_value) where return_value is None if no return value + """ + yielded_items = [] + return_value = None + try: + while True: + yielded_items.append(next(generator)) + except StopIteration as e: + return_value = e.value + return yielded_items, return_value + + def test_get_activity_call_result_returns_result_when_history_available(self): + """Test get_activity_call_result returns result when history is available.""" + context = create_mock_context(task_completed_results=["test_result"]) + tracker = TaskTracker(context) + + result = tracker.get_activity_call_result("test_activity", "test_input") + assert result == "test_result" + + def test_get_activity_call_result_raises_yield_exception_when_no_history(self): + """Test get_activity_call_result raises YieldException when no history.""" + context = create_mock_context(task_completed_results=[]) + tracker = TaskTracker(context) + + with pytest.raises(YieldException) as exc_info: + tracker.get_activity_call_result("test_activity", "test_input") + + task = exc_info.value.task + assert task.activity_name == "test_activity" + assert task.input == "test_input" + + def test_get_activity_call_result_with_retry_returns_result_when_history_available(self): + """Test get_activity_call_result_with_retry returns result when history is available.""" + context = create_mock_context(task_completed_results=["result"]) + tracker = TaskTracker(context) + retry_options = RetryOptions(1000, 3) + + result = tracker.get_activity_call_result_with_retry("activity", retry_options, "input") + assert result == "result" + + def test_get_activity_call_result_with_retry_raises_yield_exception_when_no_history(self): + """Test get_activity_call_result_with_retry raises YieldException when no history.""" + context = create_mock_context(task_completed_results=[]) + tracker = TaskTracker(context) + retry_options = RetryOptions(1000, 3) + + with pytest.raises(YieldException) as exc_info: + tracker.get_activity_call_result_with_retry("activity", retry_options, "input") + + task = exc_info.value.task + assert task.activity_name == "activity" + assert task.input == "input" + + def test_multiple_activity_calls_with_partial_history(self): + """Test sequential activity calls with partial history available.""" + context = create_mock_context(task_completed_results=["result1", "result2"]) + tracker = TaskTracker(context) + + # First call returns result1 + result1 = tracker.get_activity_call_result("activity1", "input1") + assert result1 == "result1" + + # Second call returns result2 + result2 = tracker.get_activity_call_result("activity2", "input2") + assert result2 == "result2" + + # Third call raises YieldException (no more history) + with pytest.raises(YieldException): + tracker.get_activity_call_result("activity3", "input3") + + def test_execute_orchestrator_function_return_value(self): + """Test execute_orchestrator_function with orchestrator function that returns a value.""" + context = create_mock_context() + tracker = TaskTracker(context) + + expected_result = "orchestrator_result" + + def test_orchestrator(): + return expected_result + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Should yield nothing and return the value + assert yielded_items == [] + assert return_value == expected_result + + def test_execute_orchestrator_function_get_activity_call_result_incomplete(self): + """Test execute_orchestrator_function with orchestrator function that tries to get an activity result before this activity call completes (not a replay).""" + context = create_mock_context() # No history available + tracker = TaskTracker(context) + + def test_orchestrator(): + return tracker.get_activity_call_result("activity", "test_input") + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Should yield a task with this activity name + assert yielded_items[0].activity_name == "activity" + assert len(yielded_items) == 1 + assert return_value is None + + def test_execute_orchestrator_function_get_complete_activity_result(self): + """Test execute_orchestrator_function with orchestrator function that gets a complete activity call result (replay).""" + context = create_mock_context(task_completed_results=["activity_result"]) + tracker = TaskTracker(context) + + def test_orchestrator(): + return tracker.get_activity_call_result("activity", "test_input") + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Should yield the queued task and return the result + assert yielded_items[0].activity_name == "activity" + assert len(yielded_items) == 1 + assert return_value == "activity_result" + + def test_execute_orchestrator_function_yields_tasks(self): + """Test execute_orchestrator_function with orchestrator function that yields tasks.""" + context = create_mock_context() + tracker = TaskTracker(context) + + def test_orchestrator(): + yield "task_1" + yield "task_2" + return "final_result" + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Should yield the tasks in order and return the final result + assert yielded_items[0] == "task_1" + assert yielded_items[1] == "task_2" + assert len(yielded_items) == 2 + assert return_value == "final_result" + + def test_execute_orchestrator_function_context_activity_call_incomplete(self): + """Test execute_orchestrator_function with orchestrator function that tries to get an activity result before this activity call completes (not a replay) after a DurableAIAgentContext.call_activity invocation.""" + context = create_mock_context(task_completed_results=["result1"]) + tracker = TaskTracker(context) + + def test_orchestrator(): + # Simulate invoking DurableAIAgentContext.call_activity and yielding the resulting task + tracker.record_activity_call() + yield "task" # Produced "result1" + + return tracker.get_activity_call_result("activity", "input") # Incomplete, should raise YieldException that will be translated to yield + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Should yield the incomplete task + assert yielded_items[0] == "task" + assert yielded_items[1].activity_name == "activity" + assert len(yielded_items) == 2 + assert return_value == None + + def test_execute_orchestrator_function_context_activity_call_complete(self): + """Test execute_orchestrator_function with orchestrator function that gets a complete activity call result (replay) after a DurableAIAgentContext.call_activity invocation.""" + context = create_mock_context(task_completed_results=["result1", "result2"]) + tracker = TaskTracker(context) + + def test_orchestrator(): + # Simulate invoking DurableAIAgentContext.call_activity and yielding the resulting task + tracker.record_activity_call() + yield "task" # Produced "result1" + + return tracker.get_activity_call_result("activity", "input") # Complete, should return "result2" + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Should yield the queued task and return the result + assert yielded_items[0] == "task" + assert yielded_items[1].activity_name == "activity" + assert len(yielded_items) == 2 + assert return_value == "result2" + + def test_execute_orchestrator_function_mixed_behaviors_combination(self): + """Test execute_orchestrator_function mixing all documented behaviors.""" + context = create_mock_context(task_completed_results=[ + "result1", + "result2", + "result3", + "result4" + ]) + tracker = TaskTracker(context) + + def test_orchestrator(): + activity1_result = tracker.get_activity_call_result("activity1", "input1") + + # Simulate invoking DurableAIAgentContext.call_activity("activity2") and yielding the resulting task + tracker.record_activity_call() + yield "yielded task from activity2" # Produced "result2" + + # Yield a regular task, possibly returned from DurableAIAgentContext methods like wait_for_external_event, etc. + yield "another yielded task" + + activity3_result = tracker.get_activity_call_result("activity3", "input3") + + # Simulate invoking DurableAIAgentContext.call_activity("activity4") and yielding the resulting task + tracker.record_activity_call() + yield "yielded task from activity4" # Produced "result4" + + return f"activity1={activity1_result};activity3={activity3_result}" + + result_gen = tracker.execute_orchestrator_function(test_orchestrator) + yielded_items, return_value = self._consume_generator_with_return_value(result_gen) + + # Verify yield order + assert yielded_items[0].activity_name == "activity1" + assert yielded_items[1] == "yielded task from activity2" + assert yielded_items[2] == "another yielded task" + assert yielded_items[3].activity_name == "activity3" + assert yielded_items[4] == "yielded task from activity4" + assert len(yielded_items) == 5 + + # Verify return value + expected_return = "activity1=result1;activity3=result3" + assert return_value == expected_return diff --git a/tests/openai_agents/test_usage_telemetry.py b/tests/openai_agents/test_usage_telemetry.py new file mode 100644 index 00000000..4f911b24 --- /dev/null +++ b/tests/openai_agents/test_usage_telemetry.py @@ -0,0 +1,99 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import unittest.mock + + +class TestUsageTelemetry: + """Test cases for the UsageTelemetry class.""" + + def test_log_usage_once_logs_message_on_first_call(self, capsys): + """Test that log_usage_once logs the telemetry message.""" + # Reset any previous state by creating a fresh import + import importlib + from azure.durable_functions.openai_agents import usage_telemetry + importlib.reload(usage_telemetry) + UsageTelemetryFresh = usage_telemetry.UsageTelemetry + + def mock_version(package_name): + if package_name == "azure-functions-durable": + return "1.3.4" + elif package_name == "openai": + return "1.98.0" + elif package_name == "openai-agents": + return "0.2.5" + return "unknown" + + with unittest.mock.patch('importlib.metadata.version', side_effect=mock_version): + UsageTelemetryFresh.log_usage_once() + + captured = capsys.readouterr() + assert captured.out.startswith("LanguageWorkerConsoleLog") + assert "Detected OpenAI Agents SDK integration with Durable Functions." in captured.out + assert "azure-functions-durable=1.3.4" in captured.out + assert "openai=1.98.0" in captured.out + assert "openai-agents=0.2.5" in captured.out + + def test_log_usage_handles_package_version_errors(self, capsys): + """Test that log_usage_once handles package version lookup errors gracefully.""" + # Reset any previous state by creating a fresh import + import importlib + from azure.durable_functions.openai_agents import usage_telemetry + importlib.reload(usage_telemetry) + UsageTelemetryFresh = usage_telemetry.UsageTelemetry + + # Test with mixed success/failure scenario: some packages work, others fail + def mock_version(package_name): + if package_name == "azure-functions-durable": + return "1.3.4" + elif package_name == "openai": + raise Exception("Package not found") + elif package_name == "openai-agents": + return "0.2.5" + return "unknown" + + with unittest.mock.patch('importlib.metadata.version', side_effect=mock_version): + UsageTelemetryFresh.log_usage_once() + + captured = capsys.readouterr() + assert captured.out.startswith("LanguageWorkerConsoleLog") + assert "Detected OpenAI Agents SDK integration with Durable Functions." in captured.out + # Should handle errors gracefully: successful packages show versions, failed ones show "(not installed)" + assert "azure-functions-durable=1.3.4" in captured.out + assert "openai=(not installed)" in captured.out + assert "openai-agents=0.2.5" in captured.out + + def test_log_usage_works_with_real_packages(self, capsys): + """Test that log_usage_once works with real package versions.""" + # Reset any previous state by creating a fresh import + import importlib + from azure.durable_functions.openai_agents import usage_telemetry + importlib.reload(usage_telemetry) + UsageTelemetryFresh = usage_telemetry.UsageTelemetry + + # Test without mocking to see the real behavior + UsageTelemetryFresh.log_usage_once() + + captured = capsys.readouterr() + assert captured.out.startswith("LanguageWorkerConsoleLog") + assert "Detected OpenAI Agents SDK integration with Durable Functions." in captured.out + # Should contain some version information or (unavailable) + assert ("azure-functions-durable=" in captured.out or "(unavailable)" in captured.out) + + def test_log_usage_once_is_idempotent(self, capsys): + """Test that multiple calls to log_usage_once only log once.""" + # Reset any previous state by creating a fresh import + import importlib + from azure.durable_functions.openai_agents import usage_telemetry + importlib.reload(usage_telemetry) + UsageTelemetryFresh = usage_telemetry.UsageTelemetry + + with unittest.mock.patch('importlib.metadata.version', return_value="1.0.0"): + # Call multiple times + UsageTelemetryFresh.log_usage_once() + UsageTelemetryFresh.log_usage_once() + UsageTelemetryFresh.log_usage_once() + + captured = capsys.readouterr() + # Should only see one log message despite multiple calls + log_count = captured.out.count("LanguageWorkerConsoleLogDetected OpenAI Agents SDK integration") + assert log_count == 1 \ No newline at end of file diff --git a/tests/orchestrator/openai_agents/__init__.py b/tests/orchestrator/openai_agents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/orchestrator/openai_agents/test_openai_agents.py b/tests/orchestrator/openai_agents/test_openai_agents.py new file mode 100644 index 00000000..a351985d --- /dev/null +++ b/tests/orchestrator/openai_agents/test_openai_agents.py @@ -0,0 +1,316 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import azure.durable_functions as df +import azure.functions as func +import json +import pydantic +from typing import TypedDict +from agents import Agent, Runner +from azure.durable_functions.models import OrchestratorState +from azure.durable_functions.models.actions import CallActivityAction +from azure.durable_functions.models.ReplaySchema import ReplaySchema +from openai import BaseModel +from tests.orchestrator.orchestrator_test_utils import get_orchestration_state_result, assert_valid_schema, \ + assert_orchestration_state_equals +from tests.test_utils.ContextBuilder import ContextBuilder + +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.function_name("openai_agent_hello_world") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_hello_world(context): + agent = Agent( + name="Assistant", + instructions="You only respond in haikus." + ) + + result = Runner.run_sync(agent, "Tell me about recursion in programming.") + + return result.final_output; + +# +# Run an agent that uses various tools. +# +class Weather(BaseModel): + city: str + temperature_range: str + conditions: str + + @staticmethod + def from_json(data: str) -> "Weather": + return Weather(**json.loads(data)) + +@app.activity_trigger(input_name="city") +def get_weather(city: str) -> Weather: + print("[debug] get_weather called") + return Weather(city=city, temperature_range="14-20C", conditions="Sunny with wind.") + +@app.function_name("openai_agent_use_tool") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_use_tool(context): + agent = Agent( + name="Assistant", + instructions="You only respond in haikus.", + tools=[context.create_activity_tool(get_weather, retry_options=None)] + ) + + result = Runner.run_sync(agent, "Tell me the weather in Seattle.", ) + + return result.final_output; + +@app.activity_trigger(input_name="city", activity="get_weather_with_explicit_name") +def get_named_weather(city: str) -> Weather: + print("[debug] get_weather called") + return Weather(city=city, temperature_range="14-20C", conditions="Sunny with wind.") + +@app.function_name("openai_agent_use_tool_with_explicit_name") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_use_tool_with_explicit_name(context): + agent = Agent( + name="Assistant", + instructions="You only respond in haikus.", + tools=[context.create_activity_tool(get_named_weather, retry_options=None)] + ) + + result = Runner.run_sync(agent, "Tell me the weather in Seattle.", ) + + return result.final_output; + +@app.function_name("openai_agent_return_string_type") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_return_string_type(context): + return "Hello World" + +class DurableModel: + def __init__(self, property: str) -> None: + self._property = property + + def to_json(self) -> str: + return json.dumps({"property": self._property}) + +@app.function_name("openai_agent_return_durable_model_type") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_return_durable_model_type(context): + model = DurableModel(property="value") + + return model + +class TypedDictionaryModel(TypedDict): + property: str + +@app.function_name("openai_agent_return_typed_dictionary_model_type") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_return_typed_dictionary_model_type(context): + model = TypedDictionaryModel(property="value") + + return model + +class OpenAIPydanticModel(BaseModel): + property: str + +@app.function_name("openai_agent_return_openai_pydantic_model_type") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_return_openai_pydantic_model_type(context): + model = OpenAIPydanticModel(property="value") + + return model + +class PydanticModel(pydantic.BaseModel): + property: str + +@app.function_name("openai_agent_return_pydantic_model_type") +@app.orchestration_trigger(context_name="context") +@app.durable_openai_agent_orchestrator(model_retry_options=None) +def openai_agent_return_pydantic_model_type(context): + model = PydanticModel(property="value") + + return model + +model_activity_name = "run_model" + +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema) + +def add_activity_action(state: OrchestratorState, input_: str, activity_name=model_activity_name): + action = CallActivityAction(function_name=activity_name, input_=input_) + state.actions.append([action]) + +def add_activity_completed_events( + context_builder: ContextBuilder, id_: int, result: str, is_played=False, activity_name=model_activity_name): + context_builder.add_task_scheduled_event(name=activity_name, id_=id_) + context_builder.add_orchestrator_completed_event() + context_builder.add_orchestrator_started_event() + context_builder.add_task_completed_event(id_=id_, result=json.dumps(result), is_played=is_played) + +def test_openai_agent_hello_world_start(): + context_builder = ContextBuilder('test_openai_agent_hello_world_start') + + result = get_orchestration_state_result( + context_builder, openai_agent_hello_world, uses_pystein=True) + + expected_state = base_expected_state() + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me about recursion in programming.\",\"role\":\"user\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_hello_world_completed(): + context_builder = ContextBuilder('test_openai_agent_hello_world_completed') + add_activity_completed_events(context_builder, 0, '{"output":[{"id":"msg_68b9b2a9c67c81a38559c20c18fe86040a86c28ba39b53e8","content":[{"annotations":[],"text":"Skyscrapers whisper— \\nTaxis hum beneath the lights, \\nCity dreams don’t sleep.","type":"output_text","logprobs":null}],"role":"assistant","status":"completed","type":"message"}],"usage":{"requests":1,"input_tokens":27,"input_tokens_details":{"cached_tokens":0},"output_tokens":21,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":48},"response_id":"resp_68b9b2a9461481a3984d0f790dd33f7b0a86c28ba39b53e8"}') + + result = get_orchestration_state_result( + context_builder, openai_agent_hello_world, uses_pystein=True) + + expected_state = base_expected_state() + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me about recursion in programming.\",\"role\":\"user\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + expected_state._is_done = True + expected_state._output = 'Skyscrapers whisper— \nTaxis hum beneath the lights, \nCity dreams don’t sleep.' + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_use_tool_activity_start(): + context_builder = ContextBuilder('test_openai_agent_use_tool_start') + add_activity_completed_events(context_builder, 0, '{"output":[{"arguments":"{\\"args\\":\\"Seattle, WA\\"}","call_id":"call_mEdywElQTNpxAdivuEFjO0cT","name":"get_weather","type":"function_call","id":"fc_68b9ecc0ff9c819f863d6cf9e0a1b4e101011fd6f5f8c0a6","status":"completed"}],"usage":{"requests":1,"input_tokens":57,"input_tokens_details":{"cached_tokens":0},"output_tokens":17,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":74},"response_id":"resp_68b9ecc092e0819fb79b97c11aacef2001011fd6f5f8c0a6"}') + + result = get_orchestration_state_result( + context_builder, openai_agent_use_tool, uses_pystein=True) + + expected_state = base_expected_state() + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me the weather in Seattle.\",\"role\":\"user\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[{\"name\":\"get_weather\",\"description\":\"\",\"params_json_schema\":{\"properties\":{\"city\":{\"title\":\"City\",\"type\":\"string\"}},\"required\":[\"city\"],\"title\":\"get_weather_args\",\"type\":\"object\",\"additionalProperties\":false},\"strict_json_schema\":true}],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + add_activity_action(expected_state, "{\"args\":\"Seattle, WA\"}", activity_name="get_weather") + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_use_explicitly_named_tool_activity_start(): + context_builder = ContextBuilder('test_openai_agent_use_tool_start') + add_activity_completed_events(context_builder, 0, '{"output":[{"arguments":"{\\"args\\":\\"Seattle, WA\\"}","call_id":"call_mEdywElQTNpxAdivuEFjO0cT","name":"get_named_weather","type":"function_call","id":"fc_68b9ecc0ff9c819f863d6cf9e0a1b4e101011fd6f5f8c0a6","status":"completed"}],"usage":{"requests":1,"input_tokens":57,"input_tokens_details":{"cached_tokens":0},"output_tokens":17,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":74},"response_id":"resp_68b9ecc092e0819fb79b97c11aacef2001011fd6f5f8c0a6"}') + + result = get_orchestration_state_result( + context_builder, openai_agent_use_tool_with_explicit_name, uses_pystein=True) + + expected_state = base_expected_state() + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me the weather in Seattle.\",\"role\":\"user\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[{\"name\":\"get_named_weather\",\"description\":\"\",\"params_json_schema\":{\"properties\":{\"city\":{\"title\":\"City\",\"type\":\"string\"}},\"required\":[\"city\"],\"title\":\"get_named_weather_args\",\"type\":\"object\",\"additionalProperties\":false},\"strict_json_schema\":true}],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + add_activity_action(expected_state, "{\"args\":\"Seattle, WA\"}", activity_name="get_weather_with_explicit_name") + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_use_tool_activity_completed(): + context_builder = ContextBuilder('test_openai_agent_use_tool_start') + add_activity_completed_events(context_builder, 0, '{"output":[{"arguments":"{\\"args\\":\\"Seattle, WA\\"}","call_id":"call_mEdywElQTNpxAdivuEFjO0cT","name":"get_weather","type":"function_call","id":"fc_68b9ecc0ff9c819f863d6cf9e0a1b4e101011fd6f5f8c0a6","status":"completed"}],"usage":{"requests":1,"input_tokens":57,"input_tokens_details":{"cached_tokens":0},"output_tokens":17,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":74},"response_id":"resp_68b9ecc092e0819fb79b97c11aacef2001011fd6f5f8c0a6"}') + add_activity_completed_events(context_builder, 1, '{"__class__":"Weather","__module__":"function_app","__data__":"{\n \"city\": \"{\\\"args\\\":\\\"Seattle, WA\\\"}\",\n \"temperature_range\": \"14-20C\",\n \"conditions\": \"Sunny with wind.\"\n}"}') + + result = get_orchestration_state_result( + context_builder, openai_agent_use_tool, uses_pystein=True) + + expected_state = base_expected_state() + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me the weather in Seattle.\",\"role\":\"user\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[{\"name\":\"get_weather\",\"description\":\"\",\"params_json_schema\":{\"properties\":{\"city\":{\"title\":\"City\",\"type\":\"string\"}},\"required\":[\"city\"],\"title\":\"get_weather_args\",\"type\":\"object\",\"additionalProperties\":false},\"strict_json_schema\":true}],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + add_activity_action(expected_state, "{\"args\":\"Seattle, WA\"}", activity_name="get_weather") + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me the weather in Seattle.\",\"role\":\"user\"},{\"arguments\":\"{\\\"args\\\":\\\"Seattle, WA\\\"}\",\"call_id\":\"call_mEdywElQTNpxAdivuEFjO0cT\",\"name\":\"get_weather\",\"type\":\"function_call\",\"id\":\"fc_68b9ecc0ff9c819f863d6cf9e0a1b4e101011fd6f5f8c0a6\",\"status\":\"completed\"},{\"call_id\":\"call_mEdywElQTNpxAdivuEFjO0cT\",\"output\":\"{\\\"__class__\\\":\\\"Weather\\\",\\\"__module__\\\":\\\"function_app\\\",\\\"__data__\\\":\\\"{\\n \\\"city\\\": \\\"{\\\\\\\"args\\\\\\\":\\\\\\\"Seattle, WA\\\\\\\"}\\\",\\n \\\"temperature_range\\\": \\\"14-20C\\\",\\n \\\"conditions\\\": \\\"Sunny with wind.\\\"\\n}\\\"}\",\"type\":\"function_call_output\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[{\"name\":\"get_weather\",\"description\":\"\",\"params_json_schema\":{\"properties\":{\"city\":{\"title\":\"City\",\"type\":\"string\"}},\"required\":[\"city\"],\"title\":\"get_weather_args\",\"type\":\"object\",\"additionalProperties\":false},\"strict_json_schema\":true}],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_use_tool_analysis_completed(): + context_builder = ContextBuilder('test_openai_agent_use_tool_start') + add_activity_completed_events(context_builder, 0, '{"output":[{"arguments":"{\\"args\\":\\"Seattle, WA\\"}","call_id":"call_mEdywElQTNpxAdivuEFjO0cT","name":"get_weather","type":"function_call","id":"fc_68b9ecc0ff9c819f863d6cf9e0a1b4e101011fd6f5f8c0a6","status":"completed"}],"usage":{"requests":1,"input_tokens":57,"input_tokens_details":{"cached_tokens":0},"output_tokens":17,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":74},"response_id":"resp_68b9ecc092e0819fb79b97c11aacef2001011fd6f5f8c0a6"}') + add_activity_completed_events(context_builder, 1, '{"__class__":"Weather","__module__":"function_app","__data__":"{\n \"city\": \"{\\\"args\\\":\\\"Seattle, WA\\\"}\",\n \"temperature_range\": \"14-20C\",\n \"conditions\": \"Sunny with wind.\"\n}"}') + add_activity_completed_events(context_builder, 2, '{"output":[{"id":"msg_68b9f4b09c14819faa62abfd69cb53e501011fd6f5f8c0a6","content":[{"annotations":[],"text":"The weather in Seattle, WA is currently sunny with some wind. Temperatures are ranging from 14°C to 20°C.","type":"output_text","logprobs":null}],"role":"assistant","status":"completed","type":"message"}],"usage":{"requests":1,"input_tokens":107,"input_tokens_details":{"cached_tokens":0},"output_tokens":28,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":135},"response_id":"resp_68b9f4b00804819f9fe99eac95bd198e01011fd6f5f8c0a6"}') + + result = get_orchestration_state_result( + context_builder, openai_agent_use_tool, uses_pystein=True) + + expected_state = base_expected_state() + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me the weather in Seattle.\",\"role\":\"user\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[{\"name\":\"get_weather\",\"description\":\"\",\"params_json_schema\":{\"properties\":{\"city\":{\"title\":\"City\",\"type\":\"string\"}},\"required\":[\"city\"],\"title\":\"get_weather_args\",\"type\":\"object\",\"additionalProperties\":false},\"strict_json_schema\":true}],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + add_activity_action(expected_state, "{\"args\":\"Seattle, WA\"}", activity_name="get_weather") + add_activity_action(expected_state, "{\"input\":[{\"content\":\"Tell me the weather in Seattle.\",\"role\":\"user\"},{\"arguments\":\"{\\\"args\\\":\\\"Seattle, WA\\\"}\",\"call_id\":\"call_mEdywElQTNpxAdivuEFjO0cT\",\"name\":\"get_weather\",\"type\":\"function_call\",\"id\":\"fc_68b9ecc0ff9c819f863d6cf9e0a1b4e101011fd6f5f8c0a6\",\"status\":\"completed\"},{\"call_id\":\"call_mEdywElQTNpxAdivuEFjO0cT\",\"output\":\"{\\\"__class__\\\":\\\"Weather\\\",\\\"__module__\\\":\\\"function_app\\\",\\\"__data__\\\":\\\"{\\n \\\"city\\\": \\\"{\\\\\\\"args\\\\\\\":\\\\\\\"Seattle, WA\\\\\\\"}\\\",\\n \\\"temperature_range\\\": \\\"14-20C\\\",\\n \\\"conditions\\\": \\\"Sunny with wind.\\\"\\n}\\\"}\",\"type\":\"function_call_output\"}],\"model_settings\":{\"temperature\":null,\"top_p\":null,\"frequency_penalty\":null,\"presence_penalty\":null,\"tool_choice\":null,\"parallel_tool_calls\":null,\"truncation\":null,\"max_tokens\":null,\"reasoning\":null,\"metadata\":null,\"store\":null,\"include_usage\":null,\"response_include\":null,\"extra_query\":null,\"extra_body\":null,\"extra_headers\":null,\"extra_args\":null},\"tracing\":0,\"model_name\":null,\"system_instructions\":\"You only respond in haikus.\",\"tools\":[{\"name\":\"get_weather\",\"description\":\"\",\"params_json_schema\":{\"properties\":{\"city\":{\"title\":\"City\",\"type\":\"string\"}},\"required\":[\"city\"],\"title\":\"get_weather_args\",\"type\":\"object\",\"additionalProperties\":false},\"strict_json_schema\":true}],\"output_schema\":null,\"handoffs\":[],\"previous_response_id\":null,\"prompt\":null}") + expected_state._is_done = True + expected_state._output = 'The weather in Seattle, WA is currently sunny with some wind. Temperatures are ranging from 14°C to 20°C.' + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_string_serialization(): + context_builder = ContextBuilder('test_openai_agent_string_serialization') + + result = get_orchestration_state_result( + context_builder, openai_agent_return_string_type, uses_pystein=True) + + expected_state = base_expected_state() + expected_state._is_done = True + expected_state._output = "Hello World" + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_durable_model_serialization(): + context_builder = ContextBuilder('test_openai_agent_durable_model_serialization') + + result = get_orchestration_state_result( + context_builder, openai_agent_return_durable_model_type, uses_pystein=True) + + expected_state = base_expected_state() + expected_state._is_done = True + expected_state._output = DurableModel(property="value").to_json() + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_typed_dictionary_model_serialization(): + context_builder = ContextBuilder('test_openai_agent_typed_dictionary_model_serialization') + + result = get_orchestration_state_result( + context_builder, openai_agent_return_typed_dictionary_model_type, uses_pystein=True) + + expected_state = base_expected_state() + expected_state._is_done = True + expected_state._output = json.dumps(TypedDictionaryModel(property="value")) + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_openai_pydantic_model_serialization(): + context_builder = ContextBuilder('test_openai_agent_openai_pydantic_model_serialization') + + result = get_orchestration_state_result( + context_builder, openai_agent_return_openai_pydantic_model_type, uses_pystein=True) + + expected_state = base_expected_state() + expected_state._is_done = True + expected_state._output = OpenAIPydanticModel(property="value").to_json() + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_openai_agent_pydantic_model_serialization(): + context_builder = ContextBuilder('test_openai_agent_pydantic_model_serialization') + + result = get_orchestration_state_result( + context_builder, openai_agent_return_pydantic_model_type, uses_pystein=True) + + expected_state = base_expected_state() + expected_state._is_done = True + expected_state._output = PydanticModel(property="value").model_dump_json() + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result)