-
Notifications
You must be signed in to change notification settings - Fork 20
Creation of DTS example and passing of completionToken #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
RyanLettieri
merged 32 commits into
microsoft:main
from
RyanLettieri:durabletask-scheduler
Feb 18, 2025
Merged
Changes from 13 commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
026c572
Creation of DTS example and passing of completionToken
RyanLettieri 136a3d0
Adressing review feedback
RyanLettieri 6df1064
Reverting dapr readme
RyanLettieri f731c0d
Adding accessTokenManager class for refreshing credential token
RyanLettieri eb98416
Adding comments to the example
RyanLettieri 0de338d
Adding in requirement for azure-identity
RyanLettieri 6050771
Moving dts logic into its own module
RyanLettieri f4f98ee
Fixing whitesapce
RyanLettieri ea837d0
Updating dts client to refresh token
RyanLettieri f8d79d3
Cleaning up construction of dts objects and improving examples
RyanLettieri 1e67651
Migrating shared access token logic to new grpc class
RyanLettieri 6b1bfd2
Adding log statements to access_token_manager
RyanLettieri bd56a35
breaking for loop when setting interceptors
RyanLettieri efc0146
Removing changes to client.py and adding additional steps to readme.md
RyanLettieri 3fd0b08
Refactoring client and worker to pass around interceptors
RyanLettieri 4260d02
Fixing import for DefaultClientInterceptorImpl
RyanLettieri ec4617c
Adressing round 1 of feedback
RyanLettieri ed733ea
Fixing interceptor issue
RyanLettieri 99f62d7
Moving some files around to remove dependencies
RyanLettieri f9d55ab
Adressing more feedback
RyanLettieri ba1ac4f
More review feedback
RyanLettieri 2c251ea
Passing token credential as an argument rather than 2 strings
RyanLettieri 9c65176
More review feedback for token passing
RyanLettieri 877dabb
Addressing None comment and using correct metadata
RyanLettieri b39ffad
Updating unit tests
RyanLettieri 33c8b11
Fixing the type for the unit test
RyanLettieri 1da819e
Fixing grpc calls
RyanLettieri f690264
Merge branch 'main' into durabletask-scheduler
RyanLettieri 6142220
Fix linter errors and update documentation
cgillum 58f4f93
Specifying version reqiuirement for pyproject.toml
RyanLettieri d82c1b7
Updating README
RyanLettieri b3a099e
Adding comment for credential type
RyanLettieri File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| # Examples | ||
|
|
||
| This directory contains examples of how to author durable orchestrations using the Durable Task Python SDK in conjunction with the Durable Task Scheduler (DTS). | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| All the examples assume that you have a Durable Task Scheduler taskhub created. | ||
|
|
||
| The simplest way to create a taskhub is by using the az cli commands: | ||
|
|
||
| 1. Create a scheduler: | ||
| az durabletask scheduler create --resource-group <testrg> --name <testscheduler> --location <eastus> --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 --sku-name "Dedicated" --tags "{}" | ||
|
|
||
| 2. Create your taskhub | ||
| az durabletask taskhub create --resource-group <testrg> --scheduler-name <testscheduler> --name <testtaskhub> | ||
|
|
||
| 3. Retrieve the endpoint for the scheduler. This can be done by locating the taskhub in the portal. | ||
|
|
||
| 4. Set the appropriate environment variables for the TASKHUB and ENDPOINT | ||
|
|
||
| ```sh | ||
| export TASKHUB=<taskhubname> | ||
| ``` | ||
|
|
||
| ```sh | ||
| export ENDPOINT=<taskhubEndpoint> | ||
| ``` | ||
|
|
||
| 5. Since the samples rely on azure identity, ensure the package is installed and up-to-date | ||
|
|
||
| ```sh | ||
| python3 -m pip install azure-identity | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ``` | ||
|
|
||
| ## Running the examples | ||
|
|
||
| With one of the sidecars running, you can simply execute any of the examples in this directory using `python3`: | ||
|
|
||
| ```sh | ||
| python3 dts_activity_sequence.py | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| """End-to-end sample that demonstrates how to configure an orchestrator | ||
| that calls an activity function in a sequence and prints the outputs.""" | ||
| import os | ||
| from durabletask import client, task | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| from externalpackages.durabletaskscheduler.durabletask_scheduler_worker import DurableTaskSchedulerWorker | ||
| from externalpackages.durabletaskscheduler.durabletask_scheduler_client import DurableTaskSchedulerClient | ||
RyanLettieri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def hello(ctx: task.ActivityContext, name: str) -> str: | ||
| """Activity function that returns a greeting""" | ||
| return f'Hello {name}!' | ||
|
|
||
|
|
||
| def sequence(ctx: task.OrchestrationContext, _): | ||
| """Orchestrator function that calls the 'hello' activity function in a sequence""" | ||
| # call "hello" activity function in a sequence | ||
| result1 = yield ctx.call_activity(hello, input='Tokyo') | ||
| result2 = yield ctx.call_activity(hello, input='Seattle') | ||
| result3 = yield ctx.call_activity(hello, input='London') | ||
|
|
||
| # return an array of results | ||
| return [result1, result2, result3] | ||
|
|
||
|
|
||
| # Read the environment variable | ||
| taskhub_name = os.getenv("TASKHUB") | ||
|
|
||
| # Check if the variable exists | ||
| if taskhub_name: | ||
| print(f"The value of TASKHUB is: {taskhub_name}") | ||
| else: | ||
| print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") | ||
| print("If you are using windows powershell, run the following: $env:TASKHUB=\"<taskhubname>\"") | ||
| print("If you are using bash, run the following: export TASKHUB=\"<taskhubname>\"") | ||
| exit() | ||
|
|
||
| # Read the environment variable | ||
| endpoint = os.getenv("ENDPOINT") | ||
|
|
||
| # Check if the variable exists | ||
| if endpoint: | ||
| print(f"The value of ENDPOINT is: {endpoint}") | ||
| else: | ||
| print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") | ||
| print("If you are using windows powershell, run the following: $env:ENDPOINT=\"<schedulerEndpoint>\"") | ||
| print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"") | ||
| exit() | ||
|
|
||
|
|
||
| # configure and start the worker | ||
| with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, use_managed_identity=False, client_id="", taskhub=taskhub_name) as w: | ||
| w.add_orchestrator(sequence) | ||
| w.add_activity(hello) | ||
| w.start() | ||
|
|
||
| # Construct the client and run the orchestrations | ||
| c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name) | ||
| instance_id = c.schedule_new_orchestration(sequence) | ||
| state = c.wait_for_orchestration_completion(instance_id, timeout=60) | ||
| if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: | ||
| print(f'Orchestration completed! Result: {state.serialized_output}') | ||
| elif state: | ||
| print(f'Orchestration failed: {state.failure_details}') | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| """End-to-end sample that demonstrates how to configure an orchestrator | ||
| that a dynamic number activity functions in parallel, waits for them all | ||
| to complete, and prints an aggregate summary of the outputs.""" | ||
| import random | ||
| import time | ||
| import os | ||
| from durabletask import client, task | ||
| from durabletask import client, task | ||
| from externalpackages.durabletaskscheduler.durabletask_scheduler_worker import DurableTaskSchedulerWorker | ||
| from externalpackages.durabletaskscheduler.durabletask_scheduler_client import DurableTaskSchedulerClient | ||
|
|
||
|
|
||
| def get_work_items(ctx: task.ActivityContext, _) -> list[str]: | ||
| """Activity function that returns a list of work items""" | ||
| # return a random number of work items | ||
| count = random.randint(2, 10) | ||
| print(f'generating {count} work items...') | ||
| return [f'work item {i}' for i in range(count)] | ||
|
|
||
|
|
||
| def process_work_item(ctx: task.ActivityContext, item: str) -> int: | ||
| """Activity function that returns a result for a given work item""" | ||
| print(f'processing work item: {item}') | ||
|
|
||
| # simulate some work that takes a variable amount of time | ||
| time.sleep(random.random() * 5) | ||
|
|
||
| # return a result for the given work item, which is also a random number in this case | ||
| return random.randint(0, 10) | ||
|
|
||
|
|
||
| def orchestrator(ctx: task.OrchestrationContext, _): | ||
| """Orchestrator function that calls the 'get_work_items' and 'process_work_item' | ||
| activity functions in parallel, waits for them all to complete, and prints | ||
| an aggregate summary of the outputs""" | ||
|
|
||
| work_items: list[str] = yield ctx.call_activity(get_work_items) | ||
|
|
||
| # execute the work-items in parallel and wait for them all to return | ||
| tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] | ||
| results: list[int] = yield task.when_all(tasks) | ||
|
|
||
| # return an aggregate summary of the results | ||
| return { | ||
| 'work_items': work_items, | ||
| 'results': results, | ||
| 'total': sum(results), | ||
| } | ||
|
|
||
|
|
||
| # Read the environment variable | ||
| taskhub_name = os.getenv("TASKHUB") | ||
|
|
||
| # Check if the variable exists | ||
| if taskhub_name: | ||
| print(f"The value of TASKHUB is: {taskhub_name}") | ||
| else: | ||
| print("TASKHUB is not set. Please set the TASKHUB environment variable to the name of the taskhub you wish to use") | ||
| print("If you are using windows powershell, run the following: $env:TASKHUB=\"<taskhubname>\"") | ||
| print("If you are using bash, run the following: export TASKHUB=\"<taskhubname>\"") | ||
| exit() | ||
|
|
||
| # Read the environment variable | ||
| endpoint = os.getenv("ENDPOINT") | ||
|
|
||
| # Check if the variable exists | ||
| if endpoint: | ||
| print(f"The value of ENDPOINT is: {endpoint}") | ||
| else: | ||
| print("ENDPOINT is not set. Please set the ENDPOINT environment variable to the endpoint of the scheduler") | ||
| print("If you are using windows powershell, run the following: $env:ENDPOINT=\"<schedulerEndpoint>\"") | ||
| print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"") | ||
| exit() | ||
|
|
||
| # configure and start the worker | ||
| with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name) as w: | ||
| w.add_orchestrator(orchestrator) | ||
| w.add_activity(process_work_item) | ||
| w.add_activity(get_work_items) | ||
| w.start() | ||
|
|
||
| # create a client, start an orchestration, and wait for it to finish | ||
| c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name) | ||
| instance_id = c.schedule_new_orchestration(orchestrator) | ||
| state = c.wait_for_orchestration_completion(instance_id, timeout=30) | ||
| if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: | ||
| print(f'Orchestration completed! Result: {state.serialized_output}') | ||
| elif state: | ||
| print(f'Orchestration failed: {state.failure_details}') | ||
| exit() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| """Durable Task SDK for Python""" | ||
|
|
||
|
|
||
| PACKAGE_NAME = "durabletaskscheduler" |
59 changes: 59 additions & 0 deletions
59
externalpackages/durabletaskscheduler/access_token_manager.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
| from azure.identity import DefaultAzureCredential, ManagedIdentityCredential | ||
| from datetime import datetime, timedelta, timezone | ||
| from typing import Optional | ||
| import durabletask.internal.shared as shared | ||
|
|
||
| # By default, when there's 10minutes left before the token expires, refresh the token | ||
| class AccessTokenManager: | ||
| def __init__(self, refresh_buffer: int = 600, metadata: Optional[list[tuple[str, str]]] = None): | ||
| self.scope = "https://durabletask.io/.default" | ||
| self.refresh_buffer = refresh_buffer | ||
| self._use_managed_identity = False | ||
| self._metadata = metadata | ||
| self._client_id = None | ||
| self._logger = shared.get_logger("token_manager") | ||
|
|
||
| if metadata: # Ensure metadata is not None | ||
| for key, value in metadata: | ||
| if key == "use_managed_identity": | ||
| self._use_managed_identity = value.lower() == "true" # Properly convert string to bool | ||
| elif key == "client_id": | ||
| self._client_id = value # Directly assign string | ||
|
|
||
| # Choose the appropriate credential based on use_managed_identity | ||
| if self._use_managed_identity: | ||
| if not self._client_id: | ||
| self._logger.debug("Using System Assigned Managed Identity for authentication.") | ||
| self.credential = ManagedIdentityCredential() | ||
| else: | ||
| self._logger.debug("Using User Assigned Managed Identity for authentication.") | ||
| self.credential = ManagedIdentityCredential(client_id=self._client_id) | ||
| else: | ||
| self.credential = DefaultAzureCredential() | ||
| self._logger.debug("Using Default Azure Credentials for authentication.") | ||
|
|
||
| self.token = None | ||
| self.expiry_time = None | ||
|
|
||
| def get_access_token(self) -> str: | ||
| if self.token is None or self.is_token_expired(): | ||
| self.refresh_token() | ||
| return self.token | ||
|
|
||
| # Checks if the token is expired, or if it will expire in the next "refresh_buffer" seconds. | ||
| # For example, if the token is created to have a lifespan of 2 hours, and the refresh buffer is set to 30 minutes, | ||
| # We will grab a new token when there're 30minutes left on the lifespan of the token | ||
| def is_token_expired(self) -> bool: | ||
| if self.expiry_time is None: | ||
| return True | ||
| return datetime.now(timezone.utc) >= (self.expiry_time - timedelta(seconds=self.refresh_buffer)) | ||
|
|
||
| def refresh_token(self): | ||
| new_token = self.credential.get_token(self.scope) | ||
| self.token = f"Bearer {new_token.token}" | ||
|
|
||
| # Convert UNIX timestamp to timezone-aware datetime | ||
| self.expiry_time = datetime.fromtimestamp(new_token.expires_on, tz=timezone.utc) | ||
| self._logger.debug(f"Token refreshed. Expires at: {self.expiry_time}") |
29 changes: 29 additions & 0 deletions
29
externalpackages/durabletaskscheduler/durabletask_grpc_interceptor.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| from durabletask.internal.grpc_interceptor import _ClientCallDetails, DefaultClientInterceptorImpl | ||
| from externalpackages.durabletaskscheduler.access_token_manager import AccessTokenManager | ||
|
|
||
| import grpc | ||
|
|
||
| class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl): | ||
| """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, | ||
| StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an | ||
| interceptor to add additional headers to all calls as needed.""" | ||
|
|
||
| def __init__(self, metadata: list[tuple[str, str]]): | ||
| super().__init__(metadata) | ||
| self._token_manager = AccessTokenManager(metadata=self._metadata) | ||
|
|
||
| def _intercept_call( | ||
| self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails: | ||
| """Internal intercept_call implementation which adds metadata to grpc metadata in the RPC | ||
| call details.""" | ||
| # Refresh the auth token if it is present and needed | ||
| if self._metadata is not None: | ||
| for i, (key, _) in enumerate(self._metadata): | ||
| if key.lower() == "authorization": # Ensure case-insensitive comparison | ||
| new_token = self._token_manager.get_access_token() # Get the new token | ||
| self._metadata[i] = ("authorization", new_token) # Update the token | ||
|
|
||
| return super()._intercept_call(client_call_details) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.