Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions azure/durable_functions/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@

__all__ = [
'durable_openai_agent_orchestrator',
'DurableAIAgentContext',
]
'DurableAIAgentContext',
]
18 changes: 12 additions & 6 deletions azure/durable_functions/openai_agents/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@


class DurableAIAgentContext:
"""Context for AI agents running in Azure Durable Functions orchestration."""

def __init__(self, context: DurableOrchestrationContext):
self._context = context
self._activities_called = 0
Expand All @@ -36,14 +38,17 @@ def _get_activity_call_result(self, activity_name, input: str):
return result

def call_activity(self, activity_name, input: str):
"""Call an activity function and increment the activity counter."""
task = self._context.call_activity(activity_name, input)
self._activities_called += 1
return task

def set_custom_status(self, status: str):
"""Set custom status for the orchestration."""
self._context.set_custom_status(status)

def wait_for_external_event(self, event_name: str):
"""Wait for an external event in the orchestration."""
return self._context.wait_for_external_event(event_name)

def _yield_and_clear_tasks(self):
Expand All @@ -58,17 +63,18 @@ def activity_as_tool(
*,
description: Optional[str] = None,
) -> Tool:
"""
Convert an Azure Durable Functions activity to an OpenAI Agents SDK Tool.
"""Convert an Azure Durable Functions activity to an OpenAI Agents SDK Tool.

Args:
Args
----
activity_func: The Azure Functions activity function to convert
description: Optional description override for the tool

Returns:
Returns
-------
Tool: An OpenAI Agents SDK Tool object
"""

"""
activity_name = activity_func._function._name

async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
Expand All @@ -90,4 +96,4 @@ async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
params_json_schema=schema.params_json_schema,
on_invoke_tool=run_activity,
strict_json_schema=True,
)
)
21 changes: 11 additions & 10 deletions azure/durable_functions/openai_agents/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@


def _setup_durable_openai_agent(app: func.FunctionApp):
"""
Set up the Durable OpenAI Agent framework for the given FunctionApp.
"""Set up the Durable OpenAI Agent framework for the given FunctionApp.

This is automatically called when using the framework decorators.
"""
app_id = id(app)
Expand All @@ -27,30 +27,30 @@ def _setup_durable_openai_agent(app: func.FunctionApp):


def _find_function_app_in_module(module):
"""
Find a FunctionApp instance in the given module.
"""Find a FunctionApp instance in the given module.

Returns the first FunctionApp instance found, or None if none found.
"""
if not hasattr(module, '__dict__'):
return None

for name, obj in module.__dict__.items():
if isinstance(obj, func.FunctionApp):
return obj
return None


def _auto_setup_durable_openai_agent(decorated_func):
"""
Automatically detect and setup the FunctionApp for Durable OpenAI Agents.
"""Automatically detect and setup the FunctionApp for Durable OpenAI Agents.

This finds the FunctionApp in the same module as the decorated function.
"""
try:
# Get the module where the decorated function is defined
func_module = sys.modules.get(decorated_func.__module__)
if func_module is None:
return

# Find the FunctionApp instance in that module
app = _find_function_app_in_module(func_module)
if app is not None:
Expand All @@ -62,9 +62,10 @@ def _auto_setup_durable_openai_agent(decorated_func):


def durable_openai_agent_orchestrator(func):
"""Decorate Azure Durable Functions orchestrators that use OpenAI Agents."""
# Auto-setup: Find and configure the FunctionApp when decorator is applied
_auto_setup_durable_openai_agent(func)

@wraps(func)
def wrapper(durable_orchestration_context: DurableOrchestrationContext):
ensure_event_loop()
Expand Down Expand Up @@ -116,4 +117,4 @@ def wrapper(durable_orchestration_context: DurableOrchestrationContext):
finally:
yield from durable_ai_agent_context._yield_and_clear_tasks()

return wrapper
return wrapper
7 changes: 3 additions & 4 deletions azure/durable_functions/openai_agents/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@


def ensure_event_loop():
"""
Ensure an event loop is available for sync execution context.

"""Ensure an event loop is available for sync execution context.

This is necessary when calling Runner.run_sync from Azure Functions
Durable orchestrators, which run in a synchronous context but need
an event loop for internal async operations.
Expand All @@ -13,4 +12,4 @@ def ensure_event_loop():
asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncio.set_event_loop(loop)
5 changes: 4 additions & 1 deletion azure/durable_functions/openai_agents/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from azure.durable_functions.models.Task import TaskBase


class YieldException(BaseException):
"""Exception raised when an orchestrator should yield control."""

def __init__(self, task: TaskBase):
super().__init__("Orchestrator should yield.")
self.task = task
self.task = task
46 changes: 30 additions & 16 deletions azure/durable_functions/openai_agents/model_invocation_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import enum
import json
import logging
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, AsyncIterator, Optional, Union, cast

Expand Down Expand Up @@ -32,7 +31,6 @@
from agents.items import TResponseStreamEvent
from openai import (
APIStatusError,
AsyncOpenAI,
)
from openai.types.responses.tool_param import Mcp
from openai.types.responses.response_prompt_param import ResponsePromptParam
Expand All @@ -44,7 +42,9 @@
except ImportError:
# Fallback if ApplicationError is not available
class ApplicationError(Exception):
def __init__(self, message: str, non_retryable: bool = False, next_retry_delay = None):
"""Custom application error for handling retryable and non-retryable errors."""

def __init__(self, message: str, non_retryable: bool = False, next_retry_delay=None):
super().__init__(message)
self.non_retryable = non_retryable
self.next_retry_delay = next_retry_delay
Expand All @@ -53,8 +53,11 @@ def __init__(self, message: str, non_retryable: bool = False, next_retry_delay =


class HandoffInput(BaseModel):
"""Data conversion friendly representation of a Handoff. Contains only the fields which are needed by the model
execution to determine what to handoff to, not the actual handoff invocation, which remains in the workflow context.
"""Data conversion friendly representation of a Handoff.

Contains only the fields which are needed by the model execution to
determine what to handoff to, not the actual handoff invocation,
which remains in the workflow context.
"""

tool_name: str
Expand All @@ -65,8 +68,11 @@ class HandoffInput(BaseModel):


class FunctionToolInput(BaseModel):
"""Data conversion friendly representation of a FunctionTool. Contains only the fields which are needed by the model
execution to determine what tool to call, not the actual tool invocation, which remains in the workflow context.
"""Data conversion friendly representation of a FunctionTool.

Contains only the fields which are needed by the model execution to
determine what tool to call, not the actual tool invocation,
which remains in the workflow context.
"""

name: str
Expand All @@ -76,8 +82,11 @@ class FunctionToolInput(BaseModel):


class HostedMCPToolInput(BaseModel):
"""Data conversion friendly representation of a HostedMCPTool. Contains only the fields which are needed by the model
execution to determine what tool to call, not the actual tool invocation, which remains in the workflow context.
"""Data conversion friendly representation of a HostedMCPTool.

Contains only the fields which are needed by the model execution to
determine what tool to call, not the actual tool invocation,
which remains in the workflow context.
"""

tool_config: Mcp
Expand Down Expand Up @@ -110,7 +119,7 @@ def is_strict_json_schema(self) -> bool:
return self.strict_json_schema

def json_schema(self) -> dict[str, Any]:
"""The JSON schema of the output type."""
"""Get the JSON schema of the output type."""
if self.is_plain_text():
raise UserError("Output type is plain text, so no JSON schema is available")
if self.output_schema is None:
Expand Down Expand Up @@ -164,6 +173,8 @@ def from_json(cls, json_str: str) -> 'ActivityModelInput':


class ModelInvoker:
"""Handles OpenAI model invocations for Durable Functions activities."""

def __init__(self, model_provider: Optional[ModelProvider] = None):
"""Initialize the activity with a model provider."""
self._model_provider = model_provider or OpenAIProvider()
Expand Down Expand Up @@ -333,7 +344,8 @@ def make_tool_info(tool: Tool) -> ToolInput:
output_schema, AgentOutputSchema
):
raise TypeError(
f"Only AgentOutputSchema is supported by Durable Model, got {type(output_schema).__name__}"
f"Only AgentOutputSchema is supported by Durable Model, "
f"got {type(output_schema).__name__}"
)
agent_output_schema = output_schema
output_schema_input = (
Expand Down Expand Up @@ -364,7 +376,9 @@ def make_tool_info(tool: Tool) -> ToolInput:

activity_input_json = activity_input.to_json()

response = self.context._get_activity_call_result("invoke_model_activity", activity_input_json)
response = self.context._get_activity_call_result(
"invoke_model_activity", activity_input_json
)
json_response = json.loads(response)
model_response = ModelResponse(**json_response)
return model_response
Expand All @@ -387,16 +401,16 @@ def stream_response(

def create_invoke_model_activity(app: func.FunctionApp):
"""Create and register the invoke_model_activity function with the provided FunctionApp."""

@app.activity_trigger(input_name="input")
async def invoke_model_activity(input: str):
"""Activity that handles OpenAI model invocations."""
activity_input = ActivityModelInput.from_json(input)

model_invoker = ModelInvoker()
result = await model_invoker.invoke_model_activity(activity_input)

json_obj = ModelResponse.__pydantic_serializer__.to_json(result)
return json_obj.decode()
return invoke_model_activity

return invoke_model_activity
18 changes: 12 additions & 6 deletions azure/durable_functions/openai_agents/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@


class DurableOpenAIRunner:
"""Runner for OpenAI agents using Durable Functions orchestration."""

def __init__(self, context: DurableAIAgentContext) -> None:
self._runner = DEFAULT_AGENT_RUNNER or AgentRunner()
self.context = context
Expand All @@ -31,6 +33,7 @@ def run_sync(
input: Union[str, list[TResponseInputItem]],
**kwargs: Any,
) -> RunResult:
"""Run an agent synchronously with the given input and configuration."""
# workaround for https://github.com/pydantic/pydantic/issues/9541
# ValidatorIterator returned
input_json = to_json(input)
Expand All @@ -49,14 +52,15 @@ def run_sync(
model_name = run_config.model or starting_agent.model
if model_name is not None and not isinstance(model_name, str):
raise ValueError(
"Durable Functions require a model name to be a string in the run config and/or agent."
"Durable Functions require a model name to be a string in the "
"run config and/or agent."
)

updated_run_config = replace(
run_config,
model = _DurableModelStub(
model_name = model_name,
context = self.context,
model=_DurableModelStub(
model_name=model_name,
context=self.context,
),
)

Expand All @@ -77,6 +81,7 @@ def run(
input: Union[str, list[TResponseInputItem]],
**kwargs: Any,
) -> RunResult:
"""Run an agent asynchronously. Not supported in Durable Functions."""
raise RuntimeError("Durable Functions do not support asynchronous runs.")

def run_streamed(
Expand All @@ -85,4 +90,5 @@ def run_streamed(
input: Union[str, list[TResponseInputItem]],
**kwargs: Any,
) -> RunResultStreaming:
raise RuntimeError("Durable Functions do not support streaming.")
"""Run an agent with streaming. Not supported in Durable Functions."""
raise RuntimeError("Durable Functions do not support streaming.")