Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
026c572
Creation of DTS example and passing of completionToken
RyanLettieri Jan 22, 2025
136a3d0
Adressing review feedback
RyanLettieri Jan 22, 2025
6df1064
Reverting dapr readme
RyanLettieri Jan 22, 2025
f731c0d
Adding accessTokenManager class for refreshing credential token
RyanLettieri Jan 24, 2025
eb98416
Adding comments to the example
RyanLettieri Jan 24, 2025
0de338d
Adding in requirement for azure-identity
RyanLettieri Jan 24, 2025
6050771
Moving dts logic into its own module
RyanLettieri Jan 28, 2025
f4f98ee
Fixing whitesapce
RyanLettieri Jan 28, 2025
ea837d0
Updating dts client to refresh token
RyanLettieri Jan 29, 2025
f8d79d3
Cleaning up construction of dts objects and improving examples
RyanLettieri Jan 29, 2025
1e67651
Migrating shared access token logic to new grpc class
RyanLettieri Feb 4, 2025
6b1bfd2
Adding log statements to access_token_manager
RyanLettieri Feb 5, 2025
bd56a35
breaking for loop when setting interceptors
RyanLettieri Feb 5, 2025
efc0146
Removing changes to client.py and adding additional steps to readme.md
RyanLettieri Feb 7, 2025
3fd0b08
Refactoring client and worker to pass around interceptors
RyanLettieri Feb 11, 2025
4260d02
Fixing import for DefaultClientInterceptorImpl
RyanLettieri Feb 11, 2025
ec4617c
Adressing round 1 of feedback
RyanLettieri Feb 11, 2025
ed733ea
Fixing interceptor issue
RyanLettieri Feb 12, 2025
99f62d7
Moving some files around to remove dependencies
RyanLettieri Feb 12, 2025
f9d55ab
Adressing more feedback
RyanLettieri Feb 12, 2025
ba1ac4f
More review feedback
RyanLettieri Feb 12, 2025
2c251ea
Passing token credential as an argument rather than 2 strings
RyanLettieri Feb 13, 2025
9c65176
More review feedback for token passing
RyanLettieri Feb 13, 2025
877dabb
Addressing None comment and using correct metadata
RyanLettieri Feb 13, 2025
b39ffad
Updating unit tests
RyanLettieri Feb 13, 2025
33c8b11
Fixing the type for the unit test
RyanLettieri Feb 13, 2025
1da819e
Fixing grpc calls
RyanLettieri Feb 13, 2025
f690264
Merge branch 'main' into durabletask-scheduler
RyanLettieri Feb 13, 2025
6142220
Fix linter errors and update documentation
cgillum Feb 14, 2025
58f4f93
Specifying version reqiuirement for pyproject.toml
RyanLettieri Feb 18, 2025
d82c1b7
Updating README
RyanLettieri Feb 18, 2025
b3a099e
Adding comment for credential type
RyanLettieri Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(self, *,
log_handler: Optional[logging.Handler] = None,
log_formatter: Optional[logging.Formatter] = None,
secure_channel: bool = False):
self._metadata = metadata
channel = shared.get_grpc_channel(host_address, metadata, secure_channel=secure_channel)
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)
Expand Down
21 changes: 13 additions & 8 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,11 @@ def run_loop():
request_type = work_item.WhichOneof('request')
self._logger.debug(f'Received "{request_type}" work item')
if work_item.HasField('orchestratorRequest'):
executor.submit(self._execute_orchestrator, work_item.orchestratorRequest, stub)
executor.submit(self._execute_orchestrator, work_item.orchestratorRequest, stub, work_item.completionToken)
elif work_item.HasField('activityRequest'):
executor.submit(self._execute_activity, work_item.activityRequest, stub)
executor.submit(self._execute_activity, work_item.activityRequest, stub, work_item.completionToken)
elif work_item.HasField('healthPing'):
pass # no-op
else:
self._logger.warning(f'Unexpected work item type: {request_type}')

Expand Down Expand Up @@ -184,39 +186,42 @@ def stop(self):
self._logger.info("Worker shutdown completed")
self._is_running = False

def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub, completionToken):
try:
executor = _OrchestrationExecutor(self._registry, self._logger)
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(
instanceId=req.instanceId,
actions=result.actions,
customStatus=pbh.get_string_value(result.encoded_custom_status))
customStatus=pbh.get_string_value(result.encoded_custom_status),
completionToken=completionToken)
except Exception as ex:
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
failure_details = pbh.new_failure_details(ex)
actions = [pbh.new_complete_orchestration_action(-1, pb.ORCHESTRATION_STATUS_FAILED, "", failure_details)]
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions, completionToken=completionToken)

try:
stub.CompleteOrchestratorTask(res)
except Exception as ex:
self._logger.exception(f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}")

def _execute_activity(self, req: pb.ActivityRequest, stub: stubs.TaskHubSidecarServiceStub):
def _execute_activity(self, req: pb.ActivityRequest, stub: stubs.TaskHubSidecarServiceStub, completionToken):
instance_id = req.orchestrationInstance.instanceId
try:
executor = _ActivityExecutor(self._registry, self._logger)
result = executor.execute(instance_id, req.name, req.taskId, req.input.value)
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
result=pbh.get_string_value(result))
result=pbh.get_string_value(result),
completionToken=completionToken)
except Exception as ex:
res = pb.ActivityResponse(
instanceId=instance_id,
taskId=req.taskId,
failureDetails=pbh.new_failure_details(ex))
failureDetails=pbh.new_failure_details(ex),
completionToken=completionToken)

try:
stub.CompleteActivityTask(res)
Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ All the examples assume that you have a Durable Task-compatible sidecar running

1. Install the latest version of the [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/), which contains and exposes an embedded version of the Durable Task engine. The setup process (which requires Docker) will configure the workflow engine to store state in a local Redis container.

1. Clone and run the [Durable Task Sidecar](https://github.com/microsoft/durabletask-go) project locally (requires Go 1.18 or higher). Orchestration state will be stored in a local sqlite database.
2. Clone and run the [Durable Task Sidecar](https://github.com/microsoft/durabletask-go) project locally (requires Go 1.18 or higher). Orchestration state will be stored in a local sqlite database.

## Running the examples

Expand Down
41 changes: 41 additions & 0 deletions examples/dts/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Examples

This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS).

## Prerequisites

All the examples assume that you have a Durable Task Scheduler taskhub created.

The simplest way to create a taskhub is by using the az cli commands:

1. Create a scheduler:
az durabletask scheduler create --resource-group <testrg> --name <testscheduler> --location <eastus> --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{}"

2. Create your taskhub
az durabletask taskhub create --resource-group <testrg> --scheduler-name <testscheduler> --name <testtaskhub>

3. Retrieve the endpoint for the scheduler. This can be done by locating the taskhub in the portal.

4. Set the appropriate environment variables for the TASKHUB and ENDPOINT

```sh
export TASKHUB=<taskhubname>
```

```sh
export ENDPOINT=<taskhubEndpoint>
```

5. Since the samples rely on azure identity, ensure the package is installed and up-to-date

```sh
python3 -m pip install azure-identity
```

## Running the examples

With one of the sidecars running, you can simply execute any of the examples in this directory using `python3`:

```sh
python3 dts_activity_sequence.py
```
71 changes: 71 additions & 0 deletions examples/dts/dts_activity_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from durabletask import client, task
from externalpackages.durabletaskscheduler.durabletask_scheduler_worker import DurableTaskSchedulerWorker
from externalpackages.durabletaskscheduler.durabletask_scheduler_client import DurableTaskSchedulerClient
from externalpackages.durabletaskscheduler.access_token_manager import AccessTokenManager

def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
return f'Hello {name}!'


def sequence(ctx: task.OrchestrationContext, _):
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
# call "hello" activity function in a sequence
result1 = yield ctx.call_activity(hello, input='Tokyo')
result2 = yield ctx.call_activity(hello, input='Seattle')
result3 = yield ctx.call_activity(hello, input='London')

# return an array of results
return [result1, result2, result3]


# Read the environment variable
taskhub_name = os.getenv("TASKHUB")

# Check if the variable exists
if taskhub_name:
print(f"The value of TASKHUB is: {taskhub_name}")
else:
print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use")
print("If you are using windows powershell, run the following: $env:TASKHUB=\"<taskhubname>\"")
print("If you are using bash, run the following: export TASKHUB=\"<taskhubname>\"")
exit()

# Read the environment variable
endpoint = os.getenv("ENDPOINT")

# Check if the variable exists
if endpoint:
print(f"The value of ENDPOINT is: {endpoint}")
else:
print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler")
print("If you are using windows powershell, run the following: $env:ENDPOINT=\"<schedulerEndpoint>\"")
print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"")
exit()


# Define the scope for Azure Resource Manager (ARM)
arm_scope = "https://durabletask.io/.default"
token_manager = AccessTokenManager(scope = arm_scope)

meta_data: list[tuple[str, str]] = [
("taskhub", taskhub_name)
]

# configure and start the worker
with DurableTaskSchedulerWorker(host_address=endpoint, metadata=meta_data, secure_channel=True, access_token_manager=token_manager) as w:
w.add_orchestrator(sequence)
w.add_activity(hello)
w.start()

# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, metadata=meta_data, secure_channel=True, access_token_manager=token_manager)
instance_id = c.schedule_new_orchestration(sequence)
state = c.wait_for_orchestration_completion(instance_id, timeout=45)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')
99 changes: 99 additions & 0 deletions examples/dts/dts_fanout_fanin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that a dynamic number activity functions in parallel, waits for them all
to complete, and prints an aggregate summary of the outputs."""
import random
import time
import os
from durabletask import client, task
from durabletask import client, task
from externalpackages.durabletaskscheduler.durabletask_scheduler_worker import DurableTaskSchedulerWorker
from externalpackages.durabletaskscheduler.durabletask_scheduler_client import DurableTaskSchedulerClient
from externalpackages.durabletaskscheduler.access_token_manager import AccessTokenManager


def get_work_items(ctx: task.ActivityContext, _) -> list[str]:
"""Activity function that returns a list of work items"""
# return a random number of work items
count = random.randint(2, 10)
print(f'generating {count} work items...')
return [f'work item {i}' for i in range(count)]


def process_work_item(ctx: task.ActivityContext, item: str) -> int:
"""Activity function that returns a result for a given work item"""
print(f'processing work item: {item}')

# simulate some work that takes a variable amount of time
time.sleep(random.random() * 5)

# return a result for the given work item, which is also a random number in this case
return random.randint(0, 10)


def orchestrator(ctx: task.OrchestrationContext, _):
"""Orchestrator function that calls the 'get_work_items' and 'process_work_item'
activity functions in parallel, waits for them all to complete, and prints
an aggregate summary of the outputs"""

work_items: list[str] = yield ctx.call_activity(get_work_items)

# execute the work-items in parallel and wait for them all to return
tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items]
results: list[int] = yield task.when_all(tasks)

# return an aggregate summary of the results
return {
'work_items': work_items,
'results': results,
'total': sum(results),
}


# Read the environment variable
taskhub_name = os.getenv("TASKHUB")

# Check if the variable exists
if taskhub_name:
print(f"The value of TASKHUB is: {taskhub_name}")
else:
print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use")
print("If you are using windows powershell, run the following: $env:TASKHUB=\"<taskhubname>\"")
print("If you are using bash, run the following: export TASKHUB=\"<taskhubname>\"")
exit()

# Read the environment variable
endpoint = os.getenv("ENDPOINT")

# Check if the variable exists
if endpoint:
print(f"The value of ENDPOINT is: {endpoint}")
else:
print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler")
print("If you are using windows powershell, run the following: $env:ENDPOINT=\"<schedulerEndpoint>\"")
print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"")
exit()

# Define the scope for Azure Resource Manager (ARM)
arm_scope = "https://durabletask.io/.default"
token_manager = AccessTokenManager(scope = arm_scope)

meta_data: list[tuple[str, str]] = [
("taskhub", taskhub_name)
]


# configure and start the worker
with DurableTaskSchedulerWorker(host_address=endpoint, metadata=meta_data, secure_channel=True, access_token_manager=token_manager) as w:
w.add_orchestrator(orchestrator)
w.add_activity(process_work_item)
w.add_activity(get_work_items)
w.start()

# create a client, start an orchestration, and wait for it to finish
c = DurableTaskSchedulerClient(host_address=endpoint, metadata=meta_data, secure_channel=True, access_token_manager=token_manager)
instance_id = c.schedule_new_orchestration(orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')
7 changes: 7 additions & 0 deletions externalpackages/durabletaskscheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Durable Task SDK for Python"""


PACKAGE_NAME = "durabletaskscheduler"
29 changes: 29 additions & 0 deletions externalpackages/durabletaskscheduler/access_token_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from azure.identity import DefaultAzureCredential
from datetime import datetime, timedelta

class AccessTokenManager:
def __init__(self, scope: str, refresh_buffer: int = 60):
self.scope = scope
self.refresh_buffer = refresh_buffer
self.credential = DefaultAzureCredential()
self.token = None
self.expiry_time = None

def get_access_token(self) -> str:
if self.token is None or self.is_token_expired():
self.refresh_token()
return self.token

def is_token_expired(self) -> bool:
if self.expiry_time is None:
return True
return datetime.utcnow() >= (self.expiry_time - timedelta(seconds=self.refresh_buffer))

def refresh_token(self):
new_token = self.credential.get_token(self.scope)
self.token = f"Bearer {new_token.token}"
self.expiry_time = datetime.utcnow() + timedelta(seconds=new_token.expires_on - int(datetime.utcnow().timestamp()))
print(f"Token refreshed. Expires at: {self.expiry_time}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from durabletask.client import TaskHubGrpcClient
from externalpackages.durabletaskscheduler.access_token_manager import AccessTokenManager

class DurableTaskSchedulerClient(TaskHubGrpcClient):
def __init__(self, *args, access_token_manager: AccessTokenManager, **kwargs):
# Initialize the base class
super().__init__(*args, **kwargs)
self._access_token_manager = access_token_manager
self.__update_metadata_with_token()

def __update_metadata_with_token(self):
"""
Add or update the `authorization` key in the metadata with the current access token.
"""
if self._access_token_manager is not None:
token = self._access_token_manager.get_access_token()

# Check if "authorization" already exists in the metadata
updated = False
for i, (key, _) in enumerate(self._metadata):
if key == "authorization":
self._metadata[i] = ("authorization", token)
updated = True
break

# If not updated, add a new entry
if not updated:
self._metadata.append(("authorization", token))

def schedule_new_orchestration(self, *args, **kwargs) -> str:
self.__update_metadata_with_token()
return super().schedule_new_orchestration(*args, **kwargs)

def get_orchestration_state(self, *args, **kwargs):
self.__update_metadata_with_token()
super().get_orchestration_state(*args, **kwargs)

def wait_for_orchestration_start(self, *args, **kwargs):
self.__update_metadata_with_token()
super().wait_for_orchestration_start(*args, **kwargs)

def wait_for_orchestration_completion(self, *args, **kwargs):
self.__update_metadata_with_token()
super().wait_for_orchestration_completion(*args, **kwargs)

def raise_orchestration_event(self, *args, **kwargs):
self.__update_metadata_with_token()
super().raise_orchestration_event(*args, **kwargs)

def terminate_orchestration(self, *args, **kwargs):
self.__update_metadata_with_token()
super().terminate_orchestration(*args, **kwargs)

def suspend_orchestration(self, *args, **kwargs):
self.__update_metadata_with_token()
super().suspend_orchestration(*args, **kwargs)

def resume_orchestration(self, *args, **kwargs):
self.__update_metadata_with_token()
super().resume_orchestration(*args, **kwargs)

def purge_orchestration(self, *args, **kwargs):
self.__update_metadata_with_token()
super().purge_orchestration(*args, **kwargs)
Loading
Loading