Skip to content

Commit 3044d28

Browse files
committed
final touches
1 parent 66cbcb2 commit 3044d28

File tree

3 files changed

+128
-11
lines changed

3 files changed

+128
-11
lines changed

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,59 @@
55

66
from azure.core.credentials import TokenCredential
77

8-
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import (
9-
DTSDefaultClientInterceptorImpl,
10-
)
11-
from durabletask.worker import TaskHubGrpcWorker
8+
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
9+
DTSDefaultClientInterceptorImpl
10+
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
1211

1312

1413
# Worker class used for Durable Task Scheduler (DTS)
1514
class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
15+
"""A worker implementation for Azure Durable Task Scheduler (DTS).
16+
17+
This class extends TaskHubGrpcWorker to provide integration with Azure's
18+
Durable Task Scheduler service. It handles authentication via Azure credentials
19+
and configures the necessary gRPC interceptors for DTS communication.
20+
21+
Args:
22+
host_address (str): The gRPC endpoint address of the DTS service.
23+
taskhub (str): The name of the task hub. Cannot be empty.
24+
token_credential (Optional[TokenCredential]): Azure credential for authentication.
25+
If None, anonymous authentication will be used.
26+
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
27+
Defaults to True.
28+
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
29+
for controlling worker concurrency limits. If None, default concurrency
30+
settings will be used.
31+
32+
Raises:
33+
ValueError: If taskhub is empty or None.
34+
35+
Example:
36+
>>> from azure.identity import DefaultAzureCredential
37+
>>> from durabletask.azuremanaged import DurableTaskSchedulerWorker
38+
>>> from durabletask import ConcurrencyOptions
39+
>>>
40+
>>> credential = DefaultAzureCredential()
41+
>>> concurrency = ConcurrencyOptions(max_concurrent_activities=10)
42+
>>> worker = DurableTaskSchedulerWorker(
43+
... host_address="my-dts-service.azure.com:443",
44+
... taskhub="my-task-hub",
45+
... token_credential=credential,
46+
... concurrency_options=concurrency
47+
... )
48+
49+
Note:
50+
This worker automatically configures DTS-specific gRPC interceptors
51+
for authentication and task hub routing. The parent class metadata
52+
parameter is set to None since authentication is handled by the
53+
DTS interceptor.
54+
"""
1655
def __init__(self, *,
1756
host_address: str,
1857
taskhub: str,
1958
token_credential: Optional[TokenCredential],
20-
secure_channel: bool = True):
59+
secure_channel: bool = True,
60+
concurrency_options: Optional[ConcurrencyOptions] = None):
2161

2262
if not taskhub:
2363
raise ValueError("The taskhub value cannot be empty.")
@@ -30,4 +70,5 @@ def __init__(self, *,
3070
host_address=host_address,
3171
secure_channel=secure_channel,
3272
metadata=None,
33-
interceptors=interceptors)
73+
interceptors=interceptors,
74+
concurrency_options=concurrency_options)

durabletask-azuremanaged/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta"
99

1010
[project]
1111
name = "durabletask.azuremanaged"
12-
version = "0.1.5"
12+
version = "0.2.0"
1313
description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler"
1414
keywords = [
1515
"durable",

durabletask/worker.py

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@
2828

2929

3030
class ConcurrencyOptions:
31-
"""Configuration options for controlling concurrency of different work item types.
31+
"""Configuration options for controlling concurrency of different work item types and the thread pool size.
3232
33-
This class mirrors the .NET DurableTask SDK's ConcurrencyOptions class,
34-
providing fine-grained control over concurrent processing limits for
35-
activities, orchestrations, and entities.
33+
This class provides fine-grained control over concurrent processing limits for
34+
activities, orchestrations and the thread pool size.
3635
"""
3736

3837
def __init__(
@@ -134,6 +133,83 @@ class ActivityNotRegisteredError(ValueError):
134133

135134

136135
class TaskHubGrpcWorker:
136+
"""A gRPC-based worker for processing durable task orchestrations and activities.
137+
138+
This worker connects to a Durable Task backend service via gRPC to receive and process
139+
work items including orchestration functions and activity functions. It provides
140+
concurrent execution capabilities with configurable limits and automatic retry handling.
141+
142+
The worker manages the complete lifecycle:
143+
- Registers orchestrator and activity functions
144+
- Connects to the gRPC backend service
145+
- Receives work items and executes them concurrently
146+
- Handles failures, retries, and state management
147+
- Provides logging and monitoring capabilities
148+
149+
Args:
150+
host_address (Optional[str], optional): The gRPC endpoint address of the backend service.
151+
Defaults to the value from environment variables or localhost.
152+
metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with
153+
requests. Used for authentication and routing. Defaults to None.
154+
log_handler (optional): Custom logging handler for worker logs. Defaults to None.
155+
log_formatter (Optional[logging.Formatter], optional): Custom log formatter.
156+
Defaults to None.
157+
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
158+
Defaults to False.
159+
interceptors (Optional[Sequence[shared.ClientInterceptor]], optional): Custom gRPC
160+
interceptors to apply to the channel. Defaults to None.
161+
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration for
162+
controlling worker concurrency limits. If None, default settings are used.
163+
164+
Attributes:
165+
concurrency_options (ConcurrencyOptions): The current concurrency configuration.
166+
167+
Example:
168+
Basic worker setup:
169+
170+
>>> from durabletask import TaskHubGrpcWorker, ConcurrencyOptions
171+
>>>
172+
>>> # Create worker with custom concurrency settings
173+
>>> concurrency = ConcurrencyOptions(
174+
... maximum_concurrent_activity_work_items=50,
175+
... maximum_concurrent_orchestration_work_items=20
176+
... )
177+
>>> worker = TaskHubGrpcWorker(
178+
... host_address="localhost:4001",
179+
... concurrency_options=concurrency
180+
... )
181+
>>>
182+
>>> # Register functions
183+
>>> @worker.add_orchestrator
184+
... def my_orchestrator(context, input):
185+
... result = yield context.call_activity("my_activity", input="hello")
186+
... return result
187+
>>>
188+
>>> @worker.add_activity
189+
... def my_activity(context, input):
190+
... return f"Processed: {input}"
191+
>>>
192+
>>> # Start the worker
193+
>>> worker.start()
194+
>>> # ... worker runs in background thread
195+
>>> worker.stop()
196+
197+
Using as context manager:
198+
199+
>>> with TaskHubGrpcWorker() as worker:
200+
... worker.add_orchestrator(my_orchestrator)
201+
... worker.add_activity(my_activity)
202+
... worker.start()
203+
... # Worker automatically stops when exiting context
204+
205+
Raises:
206+
RuntimeError: If attempting to add orchestrators/activities while the worker is running,
207+
or if starting a worker that is already running.
208+
OrchestratorNotRegisteredError: If an orchestration work item references an
209+
unregistered orchestrator function.
210+
ActivityNotRegisteredError: If an activity work item references an unregistered
211+
activity function.
212+
"""
137213
_response_stream: Optional[grpc.Future] = None
138214
_interceptors: Optional[list[shared.ClientInterceptor]] = None
139215

0 commit comments

Comments
 (0)