Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 67 additions & 57 deletions azure/durable_functions/openai_agents/decorators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from functools import wraps
import inspect
import sys
from typing import Optional
import azure.functions as func
from agents import ModelProvider
from agents.run import set_default_agent_runner
from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext
from .runner import DurableOpenAIRunner
Expand All @@ -15,14 +17,14 @@
_registered_apps = set()


def _setup_durable_openai_agent(app: func.FunctionApp):
def _setup_durable_openai_agent(app: func.FunctionApp, model_provider: Optional[ModelProvider]):
"""Set up the Durable OpenAI Agent framework for the given FunctionApp.

This is automatically called when using the framework decorators.
"""
app_id = id(app)
if app_id not in _registered_apps:
create_invoke_model_activity(app)
create_invoke_model_activity(app, model_provider)
_registered_apps.add(app_id)


Expand All @@ -40,7 +42,7 @@ def _find_function_app_in_module(module):
return None


def _auto_setup_durable_openai_agent(decorated_func):
def _auto_setup_durable_openai_agent(decorated_func, model_provider: Optional[ModelProvider]):
"""Automatically detect and setup the FunctionApp for Durable OpenAI Agents.

This finds the FunctionApp in the same module as the decorated function.
Expand All @@ -54,67 +56,75 @@ def _auto_setup_durable_openai_agent(decorated_func):
# Find the FunctionApp instance in that module
app = _find_function_app_in_module(func_module)
if app is not None:
_setup_durable_openai_agent(app)
_setup_durable_openai_agent(app, model_provider)
except Exception:
# Silently fail if auto-setup doesn't work
# The user can still manually call create_invoke_model_activity if needed
pass


def durable_openai_agent_orchestrator(func):
def durable_openai_agent_orchestrator(_func=None,
*,
model_provider: Optional[ModelProvider] = None):
"""Decorate Azure Durable Functions orchestrators that use OpenAI Agents."""
# Auto-setup: Find and configure the FunctionApp when decorator is applied
_auto_setup_durable_openai_agent(func)

@wraps(func)
def wrapper(durable_orchestration_context: DurableOrchestrationContext):
ensure_event_loop()
durable_ai_agent_context = DurableAIAgentContext(durable_orchestration_context)
durable_openai_runner = DurableOpenAIRunner(context=durable_ai_agent_context)
set_default_agent_runner(durable_openai_runner)

if inspect.isgeneratorfunction(func):
gen = iter(func(durable_ai_agent_context))
try:
# prime the subiterator
value = next(gen)
yield from durable_ai_agent_context._yield_and_clear_tasks()
while True:
try:
# send whatever was sent into us down to the subgenerator
yield from durable_ai_agent_context._yield_and_clear_tasks()
sent = yield value
except GeneratorExit:
# ensure the subgenerator is closed
if hasattr(gen, "close"):
gen.close()
raise
except BaseException as exc:
# forward thrown exceptions if possible
if hasattr(gen, "throw"):
value = gen.throw(type(exc), exc, exc.__traceback__)
else:
def wrapper_wrapper(func):
_auto_setup_durable_openai_agent(func, model_provider)

@wraps(func)
def wrapper(durable_orchestration_context: DurableOrchestrationContext):
ensure_event_loop()
durable_ai_agent_context = DurableAIAgentContext(durable_orchestration_context)
durable_openai_runner = DurableOpenAIRunner(context=durable_ai_agent_context)
set_default_agent_runner(durable_openai_runner)

if inspect.isgeneratorfunction(func):
gen = iter(func(durable_ai_agent_context))
try:
# prime the subiterator
value = next(gen)
yield from durable_ai_agent_context._yield_and_clear_tasks()
while True:
try:
# send whatever was sent into us down to the subgenerator
yield from durable_ai_agent_context._yield_and_clear_tasks()
sent = yield value
except GeneratorExit:
# ensure the subgenerator is closed
if hasattr(gen, "close"):
gen.close()
raise
else:
# normal path: forward .send (or .__next__)
if hasattr(gen, "send"):
value = gen.send(sent)
except BaseException as exc:
# forward thrown exceptions if possible
if hasattr(gen, "throw"):
value = gen.throw(type(exc), exc, exc.__traceback__)
else:
raise
else:
value = next(gen)
except StopIteration as e:
yield from durable_ai_agent_context._yield_and_clear_tasks()
return e.value
except YieldException as e:
yield from durable_ai_agent_context._yield_and_clear_tasks()
yield e.task
else:
try:
result = func(durable_ai_agent_context)
return result
except YieldException as e:
yield from durable_ai_agent_context._yield_and_clear_tasks()
yield e.task
finally:
yield from durable_ai_agent_context._yield_and_clear_tasks()

return wrapper
# normal path: forward .send (or .__next__)
if hasattr(gen, "send"):
value = gen.send(sent)
else:
value = next(gen)
except StopIteration as e:
yield from durable_ai_agent_context._yield_and_clear_tasks()
return e.value
except YieldException as e:
yield from durable_ai_agent_context._yield_and_clear_tasks()
yield e.task
else:
try:
result = func(durable_ai_agent_context)
return result
except YieldException as e:
yield from durable_ai_agent_context._yield_and_clear_tasks()
yield e.task
finally:
yield from durable_ai_agent_context._yield_and_clear_tasks()

return wrapper

if _func is None:
return wrapper_wrapper
else:
return wrapper_wrapper(_func)
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from __future__ import annotations
import enum
import json
import logging
Expand Down Expand Up @@ -394,20 +393,20 @@ def stream_response(
tracing: ModelTracing,
*,
previous_response_id: Optional[str],
prompt: ResponsePromptParam | None,
prompt: Optional[ResponsePromptParam],
) -> AsyncIterator[TResponseStreamEvent]:
raise NotImplementedError("Durable model doesn't support streams yet")


def create_invoke_model_activity(app: func.FunctionApp):
def create_invoke_model_activity(app: func.FunctionApp, model_provider: Optional[ModelProvider]):
"""Create and register the invoke_model_activity function with the provided FunctionApp."""

@app.activity_trigger(input_name="input")
async def invoke_model_activity(input: str):
"""Activity that handles OpenAI model invocations."""
activity_input = ActivityModelInput.from_json(input)

model_invoker = ModelInvoker()
model_invoker = ModelInvoker(model_provider=model_provider)
result = await model_invoker.invoke_model_activity(activity_input)

json_obj = ModelResponse.__pydantic_serializer__.to_json(result)
Expand Down
7 changes: 5 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ flake8-docstrings==1.5.0
pytest==7.1.2
python-dateutil==2.8.0
requests==2.32.4
jsonschema==3.2.0
jsonschema==4.25.1
aiohttp==3.12.14
azure-functions>=1.11.3b3
nox==2019.11.9
Expand All @@ -12,4 +12,7 @@ pytest-asyncio==0.20.2
autopep8
types-python-dateutil
opentelemetry-api==1.32.1
opentelemetry-sdk==1.32.1
opentelemetry-sdk==1.32.1
openai==1.98.0
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to double-check: my understanding is that adding openai and openai-agents here will not mean that every app installing azure-functions-durable will automatically install these packages as well, correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my understanding as well--that these are requirements just for the repo. As far as I know, the dependencies of the package are specified in setup.py. We do have to be careful, I believe, not to let imports of OpenAI dependencies leak into files that shouldn't necessarily require them. That is, to use "local" imports where necessary instead of imports at the top of the file.

openai-agents==0.2.4
eval_type_backport
Empty file.
Loading