Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
026c572
Creation of DTS example and passing of completionToken
RyanLettieri Jan 22, 2025
136a3d0
Adressing review feedback
RyanLettieri Jan 22, 2025
6df1064
Reverting dapr readme
RyanLettieri Jan 22, 2025
f731c0d
Adding accessTokenManager class for refreshing credential token
RyanLettieri Jan 24, 2025
eb98416
Adding comments to the example
RyanLettieri Jan 24, 2025
0de338d
Adding in requirement for azure-identity
RyanLettieri Jan 24, 2025
6050771
Moving dts logic into its own module
RyanLettieri Jan 28, 2025
f4f98ee
Fixing whitesapce
RyanLettieri Jan 28, 2025
ea837d0
Updating dts client to refresh token
RyanLettieri Jan 29, 2025
f8d79d3
Cleaning up construction of dts objects and improving examples
RyanLettieri Jan 29, 2025
1e67651
Migrating shared access token logic to new grpc class
RyanLettieri Feb 4, 2025
6b1bfd2
Adding log statements to access_token_manager
RyanLettieri Feb 5, 2025
bd56a35
breaking for loop when setting interceptors
RyanLettieri Feb 5, 2025
efc0146
Removing changes to client.py and adding additional steps to readme.md
RyanLettieri Feb 7, 2025
3fd0b08
Refactoring client and worker to pass around interceptors
RyanLettieri Feb 11, 2025
4260d02
Fixing import for DefaultClientInterceptorImpl
RyanLettieri Feb 11, 2025
ec4617c
Adressing round 1 of feedback
RyanLettieri Feb 11, 2025
ed733ea
Fixing interceptor issue
RyanLettieri Feb 12, 2025
99f62d7
Moving some files around to remove dependencies
RyanLettieri Feb 12, 2025
f9d55ab
Adressing more feedback
RyanLettieri Feb 12, 2025
ba1ac4f
More review feedback
RyanLettieri Feb 12, 2025
2c251ea
Passing token credential as an argument rather than 2 strings
RyanLettieri Feb 13, 2025
9c65176
More review feedback for token passing
RyanLettieri Feb 13, 2025
877dabb
Addressing None comment and using correct metadata
RyanLettieri Feb 13, 2025
b39ffad
Updating unit tests
RyanLettieri Feb 13, 2025
33c8b11
Fixing the type for the unit test
RyanLettieri Feb 13, 2025
1da819e
Fixing grpc calls
RyanLettieri Feb 13, 2025
f690264
Merge branch 'main' into durabletask-scheduler
RyanLettieri Feb 13, 2025
6142220
Fix linter errors and update documentation
cgillum Feb 14, 2025
58f4f93
Specifying version reqiuirement for pyproject.toml
RyanLettieri Feb 18, 2025
d82c1b7
Updating README
RyanLettieri Feb 18, 2025
b3a099e
Adding comment for credential type
RyanLettieri Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions durabletask-azuremanaged/durabletask/azuremanaged/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,13 @@ class DurableTaskSchedulerClient(TaskHubGrpcClient):
def __init__(self, *,
host_address: str,
taskhub: str,
secure_channel: Optional[bool] = True,
metadata: Optional[list[tuple[str, str]]] = None,
token_credential: Optional[TokenCredential] = None):
token_credential: TokenCredential = None,
secure_channel: Optional[bool] = True):

if taskhub == None:
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")

# Ensure metadata is a list
metadata = metadata or []
self._metadata = metadata.copy() # Use a copy to avoid modifying original

# Append DurableTask-specific metadata
self._metadata.append(("taskhub", taskhub))
self._metadata.append(("dts", "True"))
self._metadata.append(("token_credential", token_credential))
self._interceptors = [DTSDefaultClientInterceptorImpl(self._metadata)]
self._interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]

# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@

from durabletask.internal.grpc_interceptor import _ClientCallDetails, DefaultClientInterceptorImpl
from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager

from azure.core.credentials import TokenCredential
import grpc

class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl):
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
interceptor to add additional headers to all calls as needed."""

def __init__(self, metadata: list[tuple[str, str]]):
def __init__(self, token_credential: TokenCredential, taskhub_name: str):
metadata = [("taskhub", taskhub_name)]
super().__init__(metadata)

self._token_credential = None

# Check what authentication we are using
if metadata:
for key, value in metadata:
if key.lower() == "token_credential":
self._token_credential = value

self._token_manager = AccessTokenManager(token_credential=self._token_credential)
token = self._token_manager.get_access_token()
self._metadata.append(("authorization", token))
if token_credential is not None:
self._token_credential = token_credential
self._token_manager = AccessTokenManager(token_credential=self._token_credential)
token = self._token_manager.get_access_token()
self._metadata.append(("authorization", token))

def _intercept_call(
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@ def __init__(self, refresh_interval_seconds: int = 600, token_credential: TokenC
self._refresh_interval_seconds = refresh_interval_seconds
self._logger = shared.get_logger("token_manager")

# Choose the appropriate credential.
# Both TokenCredential and DefaultAzureCredential get_token methods return an AccessToken
if token_credential:
self._logger.debug("Using user provided token credentials.")
self._credential = token_credential
else:
self._credential = DefaultAzureCredential()
self._logger.debug("Using Default Azure Credentials for authentication.")
self._credential = token_credential

self._token = self._credential.get_token(self._scope)
self.expiry_time = None
Expand Down
15 changes: 3 additions & 12 deletions durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,13 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
def __init__(self, *,
host_address: str,
taskhub: str,
secure_channel: Optional[bool] = True,
metadata: Optional[list[tuple[str, str]]] = None,
token_credential: Optional[TokenCredential] = None):
token_credential: TokenCredential = None,
secure_channel: Optional[bool] = True):

if taskhub == None:
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")

# Ensure metadata is a list
metadata = metadata or []
self._metadata = metadata.copy() # Copy to prevent modifying input

# Append DurableTask-specific metadata
self._metadata.append(("taskhub", taskhub))
self._metadata.append(("dts", "True"))
self._metadata.append(("token_credential", token_credential))
interceptors = [DTSDefaultClientInterceptorImpl(self._metadata)]
interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]

# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
Expand Down
8 changes: 6 additions & 2 deletions examples/dts/dts_activity_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from durabletask import task
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from durabletask.azuremanaged.client import DurableTaskSchedulerClient, OrchestrationStatus
from azure.identity import DefaultAzureCredential

def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
Expand Down Expand Up @@ -45,15 +46,18 @@ def sequence(ctx: task.OrchestrationContext, _):
print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"")
exit()

credential = DefaultAzureCredential()

# configure and start the worker
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name) as w:
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(sequence)
w.add_activity(hello)
w.start()

# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name)
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(sequence)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == OrchestrationStatus.COMPLETED:
Expand Down
9 changes: 7 additions & 2 deletions examples/dts/dts_fanout_fanin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from durabletask import client, task
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from azure.identity import DefaultAzureCredential

def get_work_items(ctx: task.ActivityContext, _) -> list[str]:
"""Activity function that returns a list of work items"""
Expand Down Expand Up @@ -71,15 +72,19 @@ def orchestrator(ctx: task.OrchestrationContext, _):
print("If you are using bash, run the following: export ENDPOINT=\"<schedulerEndpoint>\"")
exit()

credential = DefaultAzureCredential()

# configure and start the worker
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True, taskhub=taskhub_name) as w:
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(orchestrator)
w.add_activity(process_work_item)
w.add_activity(get_work_items)
w.start()

# create a client, start an orchestration, and wait for it to finish
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name)
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=30)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
Expand Down
Loading