Skip to content

Commit 4bef7ce

Browse files
authored
Durable OpenAI Agents (#574)
1 parent abb2392 commit 4bef7ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+4012
-4
lines changed

azure/durable_functions/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,11 @@ def validate_extension_bundles():
7979
__all__.append('Blueprint')
8080
except ModuleNotFoundError:
8181
pass
82+
83+
# Import OpenAI Agents integration (optional dependency)
84+
try:
85+
from . import openai_agents # noqa
86+
__all__.append('openai_agents')
87+
except ImportError:
88+
# OpenAI agents integration requires additional dependencies
89+
pass

azure/durable_functions/decorators/durable_app.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3-
from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger,\
3+
4+
from azure.durable_functions.models.RetryOptions import RetryOptions
5+
from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \
46
DurableClient
57
from typing import Callable, Optional
68
from azure.durable_functions.entity import Entity
@@ -45,6 +47,7 @@ def __init__(self,
4547
New instance of a Durable Functions app
4648
"""
4749
super().__init__(auth_level=http_auth_level)
50+
self._is_durable_openai_agent_setup = False
4851

4952
def _configure_entity_callable(self, wrap) -> Callable:
5053
"""Obtain decorator to construct an Entity class from a user-defined Function.
@@ -250,6 +253,66 @@ def decorator():
250253

251254
return wrap
252255

256+
def _create_invoke_model_activity(self, model_provider, activity_name):
257+
"""Create and register the invoke_model_activity function with the provided FunctionApp."""
258+
259+
@self.activity_trigger(input_name="input", activity=activity_name)
260+
async def run_model_activity(input: str):
261+
from azure.durable_functions.openai_agents.orchestrator_generator\
262+
import durable_openai_agent_activity
263+
264+
return await durable_openai_agent_activity(input, model_provider)
265+
266+
return run_model_activity
267+
268+
def _setup_durable_openai_agent(self, model_provider, activity_name):
269+
if not self._is_durable_openai_agent_setup:
270+
self._create_invoke_model_activity(model_provider, activity_name)
271+
self._is_durable_openai_agent_setup = True
272+
273+
def durable_openai_agent_orchestrator(
274+
self,
275+
_func=None,
276+
*,
277+
model_provider=None,
278+
model_retry_options: Optional[RetryOptions] = RetryOptions(
279+
first_retry_interval_in_milliseconds=2000, max_number_of_attempts=5
280+
),
281+
):
282+
"""Decorate Azure Durable Functions orchestrators that use OpenAI Agents.
283+
284+
Parameters
285+
----------
286+
model_provider: Optional[ModelProvider]
287+
Use a non-default ModelProvider instead of the default OpenAIProvider,
288+
such as when testing.
289+
"""
290+
from agents import ModelProvider
291+
from azure.durable_functions.openai_agents.orchestrator_generator\
292+
import durable_openai_agent_orchestrator_generator
293+
294+
if model_provider is not None and type(model_provider) is not ModelProvider:
295+
raise TypeError("Provided model provider must be of type ModelProvider")
296+
297+
activity_name = "run_model"
298+
299+
self._setup_durable_openai_agent(model_provider, activity_name)
300+
301+
def generator_wrapper_wrapper(func):
302+
303+
@wraps(func)
304+
def generator_wrapper(context):
305+
return durable_openai_agent_orchestrator_generator(
306+
func, context, model_retry_options, activity_name
307+
)
308+
309+
return generator_wrapper
310+
311+
if _func is None:
312+
return generator_wrapper_wrapper
313+
else:
314+
return generator_wrapper_wrapper(_func)
315+
253316

254317
class DFApp(Blueprint, FunctionRegister):
255318
"""Durable Functions (DF) app.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
"""OpenAI Agents integration for Durable Functions.
4+
5+
This module provides decorators and utilities to integrate OpenAI Agents
6+
with Durable Functions orchestration patterns.
7+
"""
8+
9+
from .context import DurableAIAgentContext
10+
11+
__all__ = [
12+
'DurableAIAgentContext',
13+
]
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import json
4+
from typing import Any, Callable, Optional, TYPE_CHECKING, Union
5+
6+
from azure.durable_functions.models.DurableOrchestrationContext import (
7+
DurableOrchestrationContext,
8+
)
9+
from azure.durable_functions.models.RetryOptions import RetryOptions
10+
11+
from agents import RunContextWrapper, Tool
12+
from agents.function_schema import function_schema
13+
from agents.tool import FunctionTool
14+
15+
from azure.durable_functions.models.Task import TaskBase
16+
from .task_tracker import TaskTracker
17+
18+
19+
if TYPE_CHECKING:
20+
# At type-check time we want all members / signatures for IDE & linters.
21+
_BaseDurableContext = DurableOrchestrationContext
22+
else:
23+
class _BaseDurableContext: # lightweight runtime stub
24+
"""Runtime stub base class for delegation; real context is wrapped.
25+
26+
At runtime we avoid inheriting from DurableOrchestrationContext so that
27+
attribute lookups for its members are delegated via __getattr__ to the
28+
wrapped ``_context`` instance.
29+
"""
30+
31+
__slots__ = ()
32+
33+
34+
class DurableAIAgentContext(_BaseDurableContext):
35+
"""Context for AI agents running in Azure Durable Functions orchestration.
36+
37+
Design
38+
------
39+
* Static analysis / IDEs: Appears to subclass ``DurableOrchestrationContext`` so
40+
you get autocompletion and type hints (under TYPE_CHECKING branch).
41+
* Runtime: Inherits from a trivial stub. All durable orchestration operations
42+
are delegated to the real ``DurableOrchestrationContext`` instance provided
43+
as ``context`` and stored in ``_context``.
44+
45+
Consequences
46+
------------
47+
* ``isinstance(DurableAIAgentContext, DurableOrchestrationContext)`` is **False** at
48+
runtime (expected).
49+
* Delegation via ``__getattr__`` works for every member of the real context.
50+
* No reliance on internal initialization side-effects of the durable SDK.
51+
"""
52+
53+
def __init__(
54+
self,
55+
context: DurableOrchestrationContext,
56+
task_tracker: TaskTracker,
57+
model_retry_options: Optional[RetryOptions],
58+
):
59+
self._context = context
60+
self._task_tracker = task_tracker
61+
self._model_retry_options = model_retry_options
62+
63+
def call_activity(
64+
self, name: Union[str, Callable], input_: Optional[Any] = None
65+
) -> TaskBase:
66+
"""Schedule an activity for execution.
67+
68+
Parameters
69+
----------
70+
name: str | Callable
71+
Either the name of the activity function to call, as a string or,
72+
in the Python V2 programming model, the activity function itself.
73+
input_: Optional[Any]
74+
The JSON-serializable input to pass to the activity function.
75+
76+
Returns
77+
-------
78+
Task
79+
A Durable Task that completes when the called activity function completes or fails.
80+
"""
81+
task = self._context.call_activity(name, input_)
82+
self._task_tracker.record_activity_call()
83+
return task
84+
85+
def call_activity_with_retry(
86+
self,
87+
name: Union[str, Callable],
88+
retry_options: RetryOptions,
89+
input_: Optional[Any] = None,
90+
) -> TaskBase:
91+
"""Schedule an activity for execution with retry options.
92+
93+
Parameters
94+
----------
95+
name: str | Callable
96+
Either the name of the activity function to call, as a string or,
97+
in the Python V2 programming model, the activity function itself.
98+
retry_options: RetryOptions
99+
The retry options for the activity function.
100+
input_: Optional[Any]
101+
The JSON-serializable input to pass to the activity function.
102+
103+
Returns
104+
-------
105+
Task
106+
A Durable Task that completes when the called activity function completes or
107+
fails completely.
108+
"""
109+
task = self._context.call_activity_with_retry(name, retry_options, input_)
110+
self._task_tracker.record_activity_call()
111+
return task
112+
113+
def create_activity_tool(
114+
self,
115+
activity_func: Callable,
116+
*,
117+
description: Optional[str] = None,
118+
retry_options: Optional[RetryOptions] = RetryOptions(
119+
first_retry_interval_in_milliseconds=2000, max_number_of_attempts=5
120+
),
121+
) -> Tool:
122+
"""Convert an Azure Durable Functions activity to an OpenAI Agents SDK Tool.
123+
124+
Args
125+
----
126+
activity_func: The Azure Functions activity function to convert
127+
description: Optional description override for the tool
128+
retry_options: The retry options for the activity function
129+
130+
Returns
131+
-------
132+
Tool: An OpenAI Agents SDK Tool object
133+
134+
"""
135+
if activity_func._function is None:
136+
raise ValueError("The provided function is not a valid Azure Function.")
137+
138+
if (activity_func._function._trigger is not None
139+
and activity_func._function._trigger.activity is not None):
140+
activity_name = activity_func._function._trigger.activity
141+
else:
142+
activity_name = activity_func._function._name
143+
144+
input_name = None
145+
if (activity_func._function._trigger is not None
146+
and hasattr(activity_func._function._trigger, 'name')):
147+
input_name = activity_func._function._trigger.name
148+
149+
async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
150+
# Parse JSON input and extract the named value if input_name is specified
151+
activity_input = input
152+
if input_name:
153+
try:
154+
parsed_input = json.loads(input)
155+
if isinstance(parsed_input, dict) and input_name in parsed_input:
156+
activity_input = parsed_input[input_name]
157+
# If parsing fails or the named parameter is not found, pass the original input
158+
except (json.JSONDecodeError, TypeError):
159+
pass
160+
161+
if retry_options:
162+
result = self._task_tracker.get_activity_call_result_with_retry(
163+
activity_name, retry_options, activity_input
164+
)
165+
else:
166+
result = self._task_tracker.get_activity_call_result(activity_name, activity_input)
167+
return result
168+
169+
schema = function_schema(
170+
func=activity_func._function._func,
171+
docstring_style=None,
172+
description_override=description,
173+
use_docstring_info=True,
174+
strict_json_schema=True,
175+
)
176+
177+
return FunctionTool(
178+
name=schema.name,
179+
description=schema.description or "",
180+
params_json_schema=schema.params_json_schema,
181+
on_invoke_tool=run_activity,
182+
strict_json_schema=True,
183+
)
184+
185+
def __getattr__(self, name):
186+
"""Delegate missing attributes to the underlying DurableOrchestrationContext."""
187+
try:
188+
return getattr(self._context, name)
189+
except AttributeError:
190+
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
191+
192+
def __dir__(self):
193+
"""Improve introspection and tab-completion by including delegated attributes."""
194+
return sorted(set(dir(type(self)) + list(self.__dict__) + dir(self._context)))
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import asyncio
4+
5+
6+
def ensure_event_loop():
7+
"""Ensure an event loop is available for sync execution context.
8+
9+
This is necessary when calling Runner.run_sync from Azure Functions
10+
Durable orchestrators, which run in a synchronous context but need
11+
an event loop for internal async operations.
12+
"""
13+
try:
14+
asyncio.get_running_loop()
15+
except RuntimeError:
16+
loop = asyncio.new_event_loop()
17+
asyncio.set_event_loop(loop)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
from azure.durable_functions.models.Task import TaskBase
4+
5+
6+
class YieldException(BaseException):
7+
"""Exception raised when an orchestrator should yield control."""
8+
9+
def __init__(self, task: TaskBase):
10+
super().__init__("Orchestrator should yield.")
11+
self.task = task

0 commit comments

Comments
 (0)