Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 46 additions & 8 deletions azure/durable_functions/openai_agents/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Optional, TYPE_CHECKING
from typing import Any, Callable, Optional, TYPE_CHECKING, Union

from azure.durable_functions.models.DurableOrchestrationContext import (
DurableOrchestrationContext,
Expand All @@ -8,6 +8,8 @@
from agents import RunContextWrapper, Tool
from agents.function_schema import function_schema
from agents.tool import FunctionTool

from azure.durable_functions.models.Task import TaskBase
from .task_tracker import TaskTracker


Expand Down Expand Up @@ -55,17 +57,53 @@ def __init__(
self._task_tracker = task_tracker
self._model_retry_options = model_retry_options

def call_activity(self, activity_name, input: str):
"""Call an activity function and record the activity call."""
task = self._context.call_activity(activity_name, input)
def call_activity(
self, name: Union[str, Callable], input_: Optional[Any] = None
) -> TaskBase:
"""Schedule an activity for execution.

Parameters
----------
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function.

Returns
-------
Task
A Durable Task that completes when the called activity function completes or fails.
"""
task = self._context.call_activity(name, input_)
self._task_tracker.record_activity_call()
return task

def call_activity_with_retry(
self, activity_name, retry_options: RetryOptions, input: str = None
):
"""Call an activity function with retry options and record the activity call."""
task = self._context.call_activity_with_retry(activity_name, retry_options, input)
self,
name: Union[str, Callable],
retry_options: RetryOptions,
input_: Optional[Any] = None,
) -> TaskBase:
"""Schedule an activity for execution with retry options.

Parameters
----------
name: str | Callable
Either the name of the activity function to call, as a string or,
in the Python V2 programming model, the activity function itself.
retry_options: RetryOptions
The retry options for the activity function.
input_: Optional[Any]
The JSON-serializable input to pass to the activity function.

Returns
-------
Task
A Durable Task that completes when the called activity function completes or
fails completely.
"""
task = self._context.call_activity_with_retry(name, retry_options, input_)
self._task_tracker.record_activity_call()
return task

Expand Down