Skip to content

Commit d1fa392

Browse files
committed
Merge branch 'durable-openai-agent' into philliphoff-more-serialization
2 parents 8b5c4f9 + 3235404 commit d1fa392

File tree

11 files changed

+963
-28
lines changed

11 files changed

+963
-28
lines changed

azure/durable_functions/openai_agents/context.py

Lines changed: 109 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import Any, Callable, Optional
1+
import json
2+
from typing import Any, Callable, Optional, TYPE_CHECKING, Union
23

34
from azure.durable_functions.models.DurableOrchestrationContext import (
45
DurableOrchestrationContext,
@@ -8,11 +9,44 @@
89
from agents import RunContextWrapper, Tool
910
from agents.function_schema import function_schema
1011
from agents.tool import FunctionTool
12+
13+
from azure.durable_functions.models.Task import TaskBase
1114
from .task_tracker import TaskTracker
1215

1316

14-
class DurableAIAgentContext:
15-
"""Context for AI agents running in Azure Durable Functions orchestration."""
17+
if TYPE_CHECKING:
18+
# At type-check time we want all members / signatures for IDE & linters.
19+
_BaseDurableContext = DurableOrchestrationContext
20+
else:
21+
class _BaseDurableContext: # lightweight runtime stub
22+
"""Runtime stub base class for delegation; real context is wrapped.
23+
24+
At runtime we avoid inheriting from DurableOrchestrationContext so that
25+
attribute lookups for its members are delegated via __getattr__ to the
26+
wrapped ``_context`` instance.
27+
"""
28+
29+
__slots__ = ()
30+
31+
32+
class DurableAIAgentContext(_BaseDurableContext):
33+
"""Context for AI agents running in Azure Durable Functions orchestration.
34+
35+
Design
36+
------
37+
* Static analysis / IDEs: Appears to subclass ``DurableOrchestrationContext`` so
38+
you get autocompletion and type hints (under TYPE_CHECKING branch).
39+
* Runtime: Inherits from a trivial stub. All durable orchestration operations
40+
are delegated to the real ``DurableOrchestrationContext`` instance provided
41+
as ``context`` and stored in ``_context``.
42+
43+
Consequences
44+
------------
45+
* ``isinstance(DurableAIAgentContext, DurableOrchestrationContext)`` is **False** at
46+
runtime (expected).
47+
* Delegation via ``__getattr__`` works for every member of the real context.
48+
* No reliance on internal initialization side-effects of the durable SDK.
49+
"""
1650

1751
def __init__(
1852
self,
@@ -24,28 +58,56 @@ def __init__(
2458
self._task_tracker = task_tracker
2559
self._model_retry_options = model_retry_options
2660

27-
def call_activity(self, activity_name, input: str):
28-
"""Call an activity function and record the activity call."""
29-
task = self._context.call_activity(activity_name, input)
61+
def call_activity(
62+
self, name: Union[str, Callable], input_: Optional[Any] = None
63+
) -> TaskBase:
64+
"""Schedule an activity for execution.
65+
66+
Parameters
67+
----------
68+
name: str | Callable
69+
Either the name of the activity function to call, as a string or,
70+
in the Python V2 programming model, the activity function itself.
71+
input_: Optional[Any]
72+
The JSON-serializable input to pass to the activity function.
73+
74+
Returns
75+
-------
76+
Task
77+
A Durable Task that completes when the called activity function completes or fails.
78+
"""
79+
task = self._context.call_activity(name, input_)
3080
self._task_tracker.record_activity_call()
3181
return task
3282

3383
def call_activity_with_retry(
34-
self, activity_name, retry_options: RetryOptions, input: str = None
35-
):
36-
"""Call an activity function with retry options and record the activity call."""
37-
task = self._context.call_activity_with_retry(activity_name, retry_options, input)
84+
self,
85+
name: Union[str, Callable],
86+
retry_options: RetryOptions,
87+
input_: Optional[Any] = None,
88+
) -> TaskBase:
89+
"""Schedule an activity for execution with retry options.
90+
91+
Parameters
92+
----------
93+
name: str | Callable
94+
Either the name of the activity function to call, as a string or,
95+
in the Python V2 programming model, the activity function itself.
96+
retry_options: RetryOptions
97+
The retry options for the activity function.
98+
input_: Optional[Any]
99+
The JSON-serializable input to pass to the activity function.
100+
101+
Returns
102+
-------
103+
Task
104+
A Durable Task that completes when the called activity function completes or
105+
fails completely.
106+
"""
107+
task = self._context.call_activity_with_retry(name, retry_options, input_)
38108
self._task_tracker.record_activity_call()
39109
return task
40110

41-
def set_custom_status(self, status: str):
42-
"""Set custom status for the orchestration."""
43-
self._context.set_custom_status(status)
44-
45-
def wait_for_external_event(self, event_name: str):
46-
"""Wait for an external event in the orchestration."""
47-
return self._context.wait_for_external_event(event_name)
48-
49111
def create_activity_tool(
50112
self,
51113
activity_func: Callable,
@@ -77,13 +139,29 @@ def create_activity_tool(
77139
else:
78140
activity_name = activity_func._function._name
79141

142+
input_name = None
143+
if (activity_func._function._trigger is not None
144+
and hasattr(activity_func._function._trigger, 'name')):
145+
input_name = activity_func._function._trigger.name
146+
80147
async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
148+
# Parse JSON input and extract the named value if input_name is specified
149+
activity_input = input
150+
if input_name:
151+
try:
152+
parsed_input = json.loads(input)
153+
if isinstance(parsed_input, dict) and input_name in parsed_input:
154+
activity_input = parsed_input[input_name]
155+
# If parsing fails or the named parameter is not found, pass the original input
156+
except (json.JSONDecodeError, TypeError):
157+
pass
158+
81159
if retry_options:
82160
result = self._task_tracker.get_activity_call_result_with_retry(
83-
activity_name, retry_options, input
161+
activity_name, retry_options, activity_input
84162
)
85163
else:
86-
result = self._task_tracker.get_activity_call_result(activity_name, input)
164+
result = self._task_tracker.get_activity_call_result(activity_name, activity_input)
87165
return result
88166

89167
schema = function_schema(
@@ -101,3 +179,14 @@ async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
101179
on_invoke_tool=run_activity,
102180
strict_json_schema=True,
103181
)
182+
183+
def __getattr__(self, name):
184+
"""Delegate missing attributes to the underlying DurableOrchestrationContext."""
185+
try:
186+
return getattr(self._context, name)
187+
except AttributeError:
188+
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
189+
190+
def __dir__(self):
191+
"""Improve introspection and tab-completion by including delegated attributes."""
192+
return sorted(set(dir(type(self)) + list(self.__dict__) + dir(self._context)))

azure/durable_functions/openai_agents/model_invocation_activity.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,16 @@ class ActivityModelInput(BaseModel):
163163

164164
def to_json(self) -> str:
165165
"""Convert the ActivityModelInput to a JSON string."""
166-
return self.model_dump_json()
166+
try:
167+
return self.model_dump_json(warnings=False)
168+
except Exception:
169+
# Fallback to basic JSON serialization
170+
try:
171+
return json.dumps(self.model_dump(warnings=False), default=str)
172+
except Exception as fallback_error:
173+
raise ValueError(
174+
f"Unable to serialize ActivityModelInput: {fallback_error}"
175+
) from fallback_error
167176

168177
@classmethod
169178
def from_json(cls, json_str: str) -> 'ActivityModelInput':
@@ -286,6 +295,7 @@ async def get_response(
286295
*,
287296
previous_response_id: Optional[str],
288297
prompt: Optional[ResponsePromptParam],
298+
conversation_id: Optional[str] = None,
289299
) -> ModelResponse:
290300
"""Get a response from the model."""
291301
def make_tool_info(tool: Tool) -> ToolInput:

azure/durable_functions/openai_agents/orchestrator_generator.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .runner import DurableOpenAIRunner
1010
from .context import DurableAIAgentContext
1111
from .event_loop import ensure_event_loop
12+
from .usage_telemetry import UsageTelemetry
1213

1314

1415
async def durable_openai_agent_activity(input: str, model_provider: ModelProvider) -> str:
@@ -18,9 +19,27 @@ async def durable_openai_agent_activity(input: str, model_provider: ModelProvide
1819
model_invoker = ModelInvoker(model_provider=model_provider)
1920
response = await model_invoker.invoke_model_activity(activity_input)
2021

21-
activity_output = ActivityModelOutput(response=response)
22+
# Use safe/public Pydantic API when possible. Prefer model_dump_json if result is a BaseModel
23+
# Otherwise handle common types (str/bytes/dict/list) and fall back to json.dumps.
24+
import json as _json
2225

23-
return activity_output.to_json()
26+
if hasattr(result, "model_dump_json"):
27+
# Pydantic v2 BaseModel
28+
json_str = result.model_dump_json()
29+
else:
30+
if isinstance(result, bytes):
31+
json_str = result.decode()
32+
elif isinstance(result, str):
33+
json_str = result
34+
else:
35+
# Try the internal serializer as a last resort, but fall back to json.dumps
36+
try:
37+
json_bytes = ModelResponse.__pydantic_serializer__.to_json(result)
38+
json_str = json_bytes.decode()
39+
except Exception:
40+
json_str = _json.dumps(result)
41+
42+
return json_str
2443

2544

2645
def durable_openai_agent_orchestrator_generator(
@@ -30,6 +49,9 @@ def durable_openai_agent_orchestrator_generator(
3049
activity_name: str,
3150
):
3251
"""Adapts the synchronous OpenAI Agents function to an Durable orchestrator generator."""
52+
# Log versions the first time this generator is invoked
53+
UsageTelemetry.log_usage_once()
54+
3355
ensure_event_loop()
3456
task_tracker = TaskTracker(durable_orchestration_context)
3557
durable_ai_agent_context = DurableAIAgentContext(

azure/durable_functions/openai_agents/task_tracker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ def _get_activity_result_or_raise(self, task):
5252
result = json.loads(result_json)
5353
return result
5454

55-
def get_activity_call_result(self, activity_name, input: str):
55+
def get_activity_call_result(self, activity_name, input: Any):
5656
"""Call an activity and return its result or raise ``YieldException`` if pending."""
5757
task = self._context.call_activity(activity_name, input)
5858
return self._get_activity_result_or_raise(task)
5959

6060
def get_activity_call_result_with_retry(
61-
self, activity_name, retry_options: RetryOptions, input: str
61+
self, activity_name, retry_options: RetryOptions, input: Any
6262
):
6363
"""Call an activity with retry and return its result or raise YieldException if pending."""
6464
task = self._context.call_activity_with_retry(activity_name, retry_options, input)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
5+
class UsageTelemetry:
6+
"""Handles telemetry logging for OpenAI Agents SDK integration usage."""
7+
8+
# Class-level flag to ensure logging happens only once across all instances
9+
_usage_logged = False
10+
11+
@classmethod
12+
def log_usage_once(cls):
13+
"""Log OpenAI Agents SDK integration usage exactly once.
14+
15+
Fails gracefully if metadata cannot be retrieved.
16+
"""
17+
if cls._usage_logged:
18+
return
19+
20+
# NOTE: Any log line beginning with the special prefix defined below will be
21+
# captured by the Azure Functions host as a Language Worker console log and
22+
# forwarded to internal telemetry pipelines.
23+
# Do not change this constant value without coordinating with the Functions
24+
# host team.
25+
LANGUAGE_WORKER_CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog"
26+
27+
package_versions = cls._collect_openai_agent_package_versions()
28+
msg = (
29+
f"{LANGUAGE_WORKER_CONSOLE_LOG_PREFIX}" # Prefix captured by Azure Functions host
30+
"Detected OpenAI Agents SDK integration with Durable Functions. "
31+
f"Package versions: {package_versions}"
32+
)
33+
print(msg)
34+
35+
cls._usage_logged = True
36+
37+
@classmethod
38+
def _collect_openai_agent_package_versions(cls) -> str:
39+
"""Collect versions of relevant packages for telemetry logging.
40+
41+
Returns
42+
-------
43+
str
44+
Comma-separated list of name=version entries or "(unavailable)" if
45+
versions could not be determined.
46+
"""
47+
try:
48+
try:
49+
from importlib import metadata # Python 3.8+
50+
except ImportError: # pragma: no cover - legacy fallback
51+
import importlib_metadata as metadata # type: ignore
52+
53+
package_names = [
54+
"azure-functions-durable",
55+
"openai",
56+
"openai-agents",
57+
]
58+
59+
versions = []
60+
for package_name in package_names:
61+
try:
62+
ver = metadata.version(package_name)
63+
versions.append(f"{package_name}={ver}")
64+
except Exception: # noqa: BLE001 - swallow and continue
65+
versions.append(f"{package_name}=(not installed)")
66+
67+
return ", ".join(versions) if versions else "(unavailable)"
68+
except Exception: # noqa: BLE001 - never let version gathering break user code
69+
return "(unavailable)"

samples-v2/openai_agents/function_app.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import random
23

34
import azure.functions as func
45
import azure.durable_functions as df
@@ -102,4 +103,13 @@ def tools(context):
102103
import basic.tools
103104
return basic.tools.main()
104105

106+
@app.activity_trigger(input_name="max")
107+
async def random_number_tool(max: int) -> int:
108+
"""Return a random integer between 0 and the given maximum."""
109+
return random.randint(0, max)
105110

111+
@app.orchestration_trigger(context_name="context")
112+
@app.durable_openai_agent_orchestrator
113+
def message_filter(context):
114+
import handoffs.message_filter
115+
return handoffs.message_filter.main(context.create_activity_tool(random_number_tool))

0 commit comments

Comments
 (0)