Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install durabletask dependencies
- name: Install durabletask dependencies and the library itself in editable mode
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install -r requirements.txt
pip install -e .
- name: Install durabletask-azuremanaged dependencies
working-directory: examples/dts
run: |
Expand Down
14 changes: 12 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.2.0 (Unreleased)
## v0.3.0

### New

- Added `ConcurrencyOptions` class for fine-grained concurrency control with separate limits for activities and orchestrations. The thread pool worker count can also be configured.

### Fixed

- Fixed an issue where a worker could not recover after its connection was interrupted or severed

## v0.2.1

### New

- Added `set_custom_status` orchestrator API ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
- Added `purge_orchestration` client API ([#34](https://github.com/microsoft/durabletask-python/pull/34)) - contributed by [@famarting](https://github.com/famarting)
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://techcommunity.microsoft.com/blog/appsonazureblog/announcing-limited-early-access-of-the-durable-task-scheduler-for-azure-durable-/4286526) - by [@RyanLettieri](https://github.com/RyanLettieri)
- Added new `durabletask-azuremanaged` package for use with the [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) - by [@RyanLettieri](https://github.com/RyanLettieri)

### Changes

Expand Down
53 changes: 47 additions & 6 deletions durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,59 @@

from azure.core.credentials import TokenCredential

from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import (
DTSDefaultClientInterceptorImpl,
)
from durabletask.worker import TaskHubGrpcWorker
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import \
DTSDefaultClientInterceptorImpl
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker


# Worker class used for Durable Task Scheduler (DTS)
class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
"""A worker implementation for Azure Durable Task Scheduler (DTS).
This class extends TaskHubGrpcWorker to provide integration with Azure's
Durable Task Scheduler service. It handles authentication via Azure credentials
and configures the necessary gRPC interceptors for DTS communication.
Args:
host_address (str): The gRPC endpoint address of the DTS service.
taskhub (str): The name of the task hub. Cannot be empty.
token_credential (Optional[TokenCredential]): Azure credential for authentication.
If None, anonymous authentication will be used.
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
Defaults to True.
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
for controlling worker concurrency limits. If None, default concurrency
settings will be used.
Raises:
ValueError: If taskhub is empty or None.
Example:
>>> from azure.identity import DefaultAzureCredential
>>> from durabletask.azuremanaged import DurableTaskSchedulerWorker
>>> from durabletask.worker import ConcurrencyOptions
>>>
>>> credential = DefaultAzureCredential()
>>> concurrency = ConcurrencyOptions(max_concurrent_activities=10)
>>> worker = DurableTaskSchedulerWorker(
... host_address="my-dts-service.azure.com:443",
... taskhub="my-task-hub",
... token_credential=credential,
... concurrency_options=concurrency
... )
Note:
This worker automatically configures DTS-specific gRPC interceptors
for authentication and task hub routing. The parent class metadata
parameter is set to None since authentication is handled by the
DTS interceptor.
"""
def __init__(self, *,
host_address: str,
taskhub: str,
token_credential: Optional[TokenCredential],
secure_channel: bool = True):
secure_channel: bool = True,
concurrency_options: Optional[ConcurrencyOptions] = None):

if not taskhub:
raise ValueError("The taskhub value cannot be empty.")
Expand All @@ -30,4 +70,5 @@ def __init__(self, *,
host_address=host_address,
secure_channel=secure_channel,
metadata=None,
interceptors=interceptors)
interceptors=interceptors,
concurrency_options=concurrency_options)
6 changes: 3 additions & 3 deletions durabletask-azuremanaged/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "durabletask.azuremanaged"
version = "0.1.4"
description = "Extensions for the Durable Task Python SDK for integrating with the Durable Task Scheduler in Azure"
version = "0.2.0"
description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler"
keywords = [
"durable",
"task",
Expand All @@ -26,7 +26,7 @@ requires-python = ">=3.9"
license = {file = "LICENSE"}
readme = "README.md"
dependencies = [
"durabletask>=0.2.1",
"durabletask>=0.3.0",
"azure-identity>=1.19.0"
]

Expand Down
3 changes: 3 additions & 0 deletions durabletask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@

"""Durable Task SDK for Python"""

from durabletask.worker import ConcurrencyOptions

__all__ = ["ConcurrencyOptions"]

PACKAGE_NAME = "durabletask"
Loading
Loading