Skip to content

Commit 6142220

Browse files
committed
Fix linter errors and update documentation
1 parent f690264 commit 6142220

File tree

15 files changed

+168
-138
lines changed

15 files changed

+168
-138
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
1313
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)
14+
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) - by [@RyanLettieri](https://github.com/RyanLettieri)
1415

1516
### Changes
1617

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
# Durable Task Client SDK for Python
1+
# Durable Task SDK for Python
22

33
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
44
[![Build Validation](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml/badge.svg)](https://github.com/microsoft/durabletask-python/actions/workflows/pr-validation.yml)
55
[![PyPI version](https://badge.fury.io/py/durabletask.svg)](https://badge.fury.io/py/durabletask)
66

7-
This repo contains a Python client SDK for use with the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go) and [Dapr Workflow](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
7+
This repo contains a Python SDK for use with the [Azure Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) and the [Durable Task Framework for Go](https://github.com/microsoft/durabletask-go). With this SDK, you can define, schedule, and manage durable orchestrations using ordinary Python code.
88

99
⚠️ **This SDK is currently under active development and is not yet ready for production use.** ⚠️
1010

11-
> Note that this project is **not** currently affiliated with the [Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) project for Azure Functions. If you are looking for a Python SDK for Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
12-
11+
> Note that this SDK is **not** currently compatible with [Azure Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview). If you are looking for a Python SDK for Azure Durable Functions, please see [this repo](https://github.com/Azure/azure-functions-durable-python).
1312
1413
## Supported patterns
1514

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4-
from typing import Optional
5-
from durabletask.client import TaskHubGrpcClient, OrchestrationStatus
6-
from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager
7-
from durabletask.azuremanaged.durabletask_grpc_interceptor import DTSDefaultClientInterceptorImpl
84
from azure.core.credentials import TokenCredential
95

6+
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
7+
DTSDefaultClientInterceptorImpl
8+
from durabletask.client import TaskHubGrpcClient
9+
10+
1011
# Client class used for Durable Task Scheduler (DTS)
1112
class DurableTaskSchedulerClient(TaskHubGrpcClient):
1213
def __init__(self, *,
1314
host_address: str,
1415
taskhub: str,
1516
token_credential: TokenCredential,
16-
secure_channel: Optional[bool] = True):
17+
secure_channel: bool = True):
1718

18-
if taskhub == None:
19+
if not taskhub:
1920
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
2021

21-
self._interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]
22+
interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]
2223

2324
# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
2425
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
2526
super().__init__(
2627
host_address=host_address,
2728
secure_channel=secure_channel,
2829
metadata=None,
29-
interceptors=self._interceptors)
30+
interceptors=interceptors)
Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,33 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
3-
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
43
from datetime import datetime, timedelta, timezone
54
from typing import Optional
5+
6+
from azure.core.credentials import AccessToken, TokenCredential
7+
68
import durabletask.internal.shared as shared
7-
from azure.core.credentials import TokenCredential
9+
810

911
# By default, when there's 10minutes left before the token expires, refresh the token
1012
class AccessTokenManager:
11-
def __init__(self, refresh_interval_seconds: int = 600, token_credential: TokenCredential = None):
13+
14+
_token: Optional[AccessToken]
15+
16+
def __init__(self, token_credential: Optional[TokenCredential], refresh_interval_seconds: int = 600):
1217
self._scope = "https://durabletask.io/.default"
1318
self._refresh_interval_seconds = refresh_interval_seconds
1419
self._logger = shared.get_logger("token_manager")
1520

1621
self._credential = token_credential
17-
18-
self._token = self._credential.get_token(self._scope)
19-
self.expiry_time = None
2022

21-
def get_access_token(self) -> str:
23+
if self._credential is not None:
24+
self._token = self._credential.get_token(self._scope)
25+
self.expiry_time = datetime.fromtimestamp(self._token.expires_on, tz=timezone.utc)
26+
else:
27+
self._token = None
28+
self.expiry_time = None
29+
30+
def get_access_token(self) -> Optional[AccessToken]:
2231
if self._token is None or self.is_token_expired():
2332
self.refresh_token()
2433
return self._token
@@ -32,9 +41,9 @@ def is_token_expired(self) -> bool:
3241
return datetime.now(timezone.utc) >= (self.expiry_time - timedelta(seconds=self._refresh_interval_seconds))
3342

3443
def refresh_token(self):
35-
new_token = self._credential.get_token(self._scope)
36-
self._token = f"Bearer {new_token.token}"
37-
38-
# Convert UNIX timestamp to timezone-aware datetime
39-
self.expiry_time = datetime.fromtimestamp(new_token.expires_on, tz=timezone.utc)
40-
self._logger.debug(f"Token refreshed. Expires at: {self.expiry_time}")
44+
if self._credential is not None:
45+
self._token = self._credential.get_token(self._scope)
46+
47+
# Convert UNIX timestamp to timezone-aware datetime
48+
self.expiry_time = datetime.fromtimestamp(self._token.expires_on, tz=timezone.utc)
49+
self._logger.debug(f"Token refreshed. Expires at: {self.expiry_time}")
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,41 @@
1-
# Copyright (c) Microsoft Corporation.
2-
# Licensed under the MIT License.
3-
4-
from durabletask.internal.grpc_interceptor import _ClientCallDetails, DefaultClientInterceptorImpl
5-
from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager
6-
from azure.core.credentials import TokenCredential
7-
import grpc
8-
9-
class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl):
10-
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
11-
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
12-
interceptor to add additional headers to all calls as needed."""
13-
14-
def __init__(self, token_credential: TokenCredential, taskhub_name: str):
15-
self._metadata = [("taskhub", taskhub_name)]
16-
super().__init__(self._metadata)
17-
18-
if token_credential is not None:
19-
self._token_credential = token_credential
20-
self._token_manager = AccessTokenManager(token_credential=self._token_credential)
21-
token = self._token_manager.get_access_token()
22-
self._metadata.append(("authorization", token))
23-
24-
def _intercept_call(
25-
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
26-
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
27-
call details."""
28-
# Refresh the auth token if it is present and needed
29-
if self._metadata is not None:
30-
for i, (key, _) in enumerate(self._metadata):
31-
if key.lower() == "authorization": # Ensure case-insensitive comparison
32-
new_token = self._token_manager.get_access_token() # Get the new token
33-
self._metadata[i] = ("authorization", new_token) # Update the token
34-
35-
return super()._intercept_call(client_call_details)
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
import grpc
5+
from azure.core.credentials import TokenCredential
6+
7+
from durabletask.azuremanaged.internal.access_token_manager import \
8+
AccessTokenManager
9+
from durabletask.internal.grpc_interceptor import (
10+
DefaultClientInterceptorImpl, _ClientCallDetails)
11+
12+
13+
class DTSDefaultClientInterceptorImpl (DefaultClientInterceptorImpl):
14+
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
15+
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
16+
interceptor to add additional headers to all calls as needed."""
17+
18+
def __init__(self, token_credential: TokenCredential, taskhub_name: str):
19+
self._metadata = [("taskhub", taskhub_name)]
20+
super().__init__(self._metadata)
21+
22+
if token_credential is not None:
23+
self._token_credential = token_credential
24+
self._token_manager = AccessTokenManager(token_credential=self._token_credential)
25+
access_token = self._token_manager.get_access_token()
26+
if access_token is not None:
27+
self._metadata.append(("authorization", f"Bearer {access_token.token}"))
28+
29+
def _intercept_call(
30+
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
31+
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
32+
call details."""
33+
# Refresh the auth token if it is present and needed
34+
if self._metadata is not None:
35+
for i, (key, _) in enumerate(self._metadata):
36+
if key.lower() == "authorization": # Ensure case-insensitive comparison
37+
new_token = self._token_manager.get_access_token() # Get the new token
38+
if new_token is not None:
39+
self._metadata[i] = ("authorization", f"Bearer {new_token.token}") # Update the token
40+
41+
return super()._intercept_call(client_call_details)
Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4-
from typing import Optional
5-
from durabletask.worker import TaskHubGrpcWorker
6-
from durabletask.azuremanaged.internal.access_token_manager import AccessTokenManager
7-
from durabletask.azuremanaged.durabletask_grpc_interceptor import DTSDefaultClientInterceptorImpl
84
from azure.core.credentials import TokenCredential
95

6+
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
7+
DTSDefaultClientInterceptorImpl
8+
from durabletask.worker import TaskHubGrpcWorker
9+
10+
1011
# Worker class used for Durable Task Scheduler (DTS)
1112
class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
1213
def __init__(self, *,
1314
host_address: str,
1415
taskhub: str,
1516
token_credential: TokenCredential,
16-
secure_channel: Optional[bool] = True):
17-
18-
if taskhub == None:
19-
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
17+
secure_channel: bool = True):
18+
19+
if not taskhub:
20+
raise ValueError("The taskhub value cannot be empty.")
2021

2122
interceptors = [DTSDefaultClientInterceptorImpl(token_credential, taskhub)]
2223

@@ -25,5 +26,5 @@ def __init__(self, *,
2526
super().__init__(
2627
host_address=host_address,
2728
secure_channel=secure_channel,
28-
metadata=None,
29-
interceptors=interceptors)
29+
metadata=None,
30+
interceptors=interceptors)

durabletask/client.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dataclasses import dataclass
77
from datetime import datetime
88
from enum import Enum
9-
from typing import Any, Optional, TypeVar, Union
9+
from typing import Any, Optional, Sequence, TypeVar, Union
1010

1111
import grpc
1212
from google.protobuf import wrappers_pb2
@@ -15,9 +15,8 @@
1515
import durabletask.internal.orchestrator_service_pb2 as pb
1616
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1717
import durabletask.internal.shared as shared
18-
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
19-
2018
from durabletask import task
19+
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2120

2221
TInput = TypeVar('TInput')
2322
TOutput = TypeVar('TOutput')
@@ -99,22 +98,23 @@ def __init__(self, *,
9998
log_handler: Optional[logging.Handler] = None,
10099
log_formatter: Optional[logging.Formatter] = None,
101100
secure_channel: bool = False,
102-
interceptors: Optional[list[Union[grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor]]] = None):
101+
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None):
103102

104-
# Determine the interceptors to use
103+
# If the caller provided metadata, we need to create a new interceptor for it and
104+
# add it to the list of interceptors.
105105
if interceptors is not None:
106-
self._interceptors = interceptors
107-
if metadata:
108-
self._interceptors.append(DefaultClientInterceptorImpl(metadata))
109-
elif metadata:
110-
self._interceptors = [DefaultClientInterceptorImpl(metadata)]
106+
interceptors = list(interceptors)
107+
if metadata is not None:
108+
interceptors.append(DefaultClientInterceptorImpl(metadata))
109+
elif metadata is not None:
110+
interceptors = [DefaultClientInterceptorImpl(metadata)]
111111
else:
112-
self._interceptors = None
112+
interceptors = None
113113

114114
channel = shared.get_grpc_channel(
115115
host_address=host_address,
116116
secure_channel=secure_channel,
117-
interceptors=self._interceptors
117+
interceptors=interceptors
118118
)
119119
self._stub = stubs.TaskHubSidecarServiceStub(channel)
120120
self._logger = shared.get_logger("client", log_handler, log_formatter)
@@ -134,7 +134,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
134134
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
135135
version=wrappers_pb2.StringValue(value=""),
136136
orchestrationIdReusePolicy=reuse_id_policy,
137-
)
137+
)
138138

139139
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
140140
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)

durabletask/internal/grpc_interceptor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,28 @@ class _ClientCallDetails(
1919

2020

2121
class DefaultClientInterceptorImpl (
22-
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23-
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
22+
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor,
23+
grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
2424
"""The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor,
25-
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
25+
StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an
2626
interceptor to add additional headers to all calls as needed."""
2727

2828
def __init__(self, metadata: list[tuple[str, str]]):
2929
super().__init__()
3030
self._metadata = metadata
3131

3232
def _intercept_call(
33-
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
33+
self, client_call_details: _ClientCallDetails) -> grpc.ClientCallDetails:
3434
"""Internal intercept_call implementation which adds metadata to grpc metadata in the RPC
3535
call details."""
3636
if self._metadata is None:
3737
return client_call_details
38-
38+
3939
if client_call_details.metadata is not None:
4040
metadata = list(client_call_details.metadata)
4141
else:
4242
metadata = []
43-
43+
4444
metadata.extend(self._metadata)
4545
client_call_details = _ClientCallDetails(
4646
client_call_details.method, client_call_details.timeout, metadata,

durabletask/internal/shared.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,17 @@
55
import json
66
import logging
77
from types import SimpleNamespace
8-
from typing import Any, Optional
9-
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
8+
from typing import Any, Optional, Sequence, Union
109

1110
import grpc
1211

12+
ClientInterceptor = Union[
13+
grpc.UnaryUnaryClientInterceptor,
14+
grpc.UnaryStreamClientInterceptor,
15+
grpc.StreamUnaryClientInterceptor,
16+
grpc.StreamStreamClientInterceptor
17+
]
18+
1319
# Field name used to indicate that an object was automatically serialized
1420
# and should be deserialized as a SimpleNamespace
1521
AUTO_SERIALIZED = "__durabletask_autoobject__"
@@ -25,8 +31,8 @@ def get_default_host_address() -> str:
2531
def get_grpc_channel(
2632
host_address: Optional[str],
2733
secure_channel: bool = False,
28-
interceptors: Optional[list[DefaultClientInterceptorImpl]] = None) -> grpc.Channel:
29-
34+
interceptors: Optional[Sequence[ClientInterceptor]] = None) -> grpc.Channel:
35+
3036
if host_address is None:
3137
host_address = get_default_host_address()
3238

@@ -55,6 +61,7 @@ def get_grpc_channel(
5561
channel = grpc.intercept_channel(channel, *interceptors)
5662
return channel
5763

64+
5865
def get_logger(
5966
name_suffix: str,
6067
log_handler: Optional[logging.Handler] = None,
@@ -99,7 +106,7 @@ def default(self, obj):
99106
if dataclasses.is_dataclass(obj):
100107
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
101108
# automatic deserialization by the receiver
102-
d = dataclasses.asdict(obj) # type: ignore
109+
d = dataclasses.asdict(obj) # type: ignore
103110
d[AUTO_SERIALIZED] = True
104111
return d
105112
elif isinstance(obj, SimpleNamespace):

0 commit comments

Comments
 (0)