Skip to content

Commit b9ac4ff

Browse files
authored
Merge branch 'main' into andystaples/abandon-failed-workitems
2 parents 7d394aa + 5b453ed commit b9ac4ff

File tree

11 files changed

+124
-23
lines changed

11 files changed

+124
-23
lines changed

.github/workflows/durabletask-azuremanaged.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515
runs-on: ubuntu-latest
1616
steps:
1717
- uses: actions/checkout@v4
18-
- name: Set up Python 3.13
18+
- name: Set up Python 3.14
1919
uses: actions/setup-python@v5
2020
with:
21-
python-version: 3.13
21+
python-version: 3.14
2222
- name: Install dependencies
2323
working-directory: durabletask-azuremanaged
2424
run: |
@@ -36,7 +36,7 @@ jobs:
3636
strategy:
3737
fail-fast: false
3838
matrix:
39-
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
39+
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
4040
env:
4141
EMULATOR_VERSION: "latest"
4242
needs: lint
@@ -100,7 +100,7 @@ jobs:
100100
- name: Set up Python
101101
uses: actions/setup-python@v5
102102
with:
103-
python-version: "3.13" # Adjust Python version as needed
103+
python-version: "3.14" # Adjust Python version as needed
104104

105105
- name: Install dependencies
106106
run: |

.github/workflows/durabletask.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515
runs-on: ubuntu-latest
1616
steps:
1717
- uses: actions/checkout@v4
18-
- name: Set up Python 3.13
18+
- name: Set up Python 3.14
1919
uses: actions/setup-python@v5
2020
with:
21-
python-version: 3.13
21+
python-version: 3.14
2222
- name: Install dependencies
2323
run: |
2424
python -m pip install --upgrade pip
@@ -38,7 +38,7 @@ jobs:
3838
strategy:
3939
fail-fast: false
4040
matrix:
41-
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
41+
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
4242
needs: lint-and-unit-tests
4343
runs-on: ubuntu-latest
4444
steps:
@@ -85,7 +85,7 @@ jobs:
8585
- name: Set up Python
8686
uses: actions/setup-python@v5
8787
with:
88-
python-version: "3.13" # Adjust Python version as needed
88+
python-version: "3.14" # Adjust Python version as needed
8989

9090
- name: Install dependencies
9191
run: |

docs/features.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and
148148

149149
### Retry policies
150150

151-
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
151+
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
152+
153+
### Logging configuration
154+
155+
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.
156+
157+
For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply:
158+
159+
```python
160+
log_handler = logging.FileHandler('durable.log', encoding='utf-8')
161+
log_handler.setLevel(logging.DEBUG)
162+
163+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
164+
taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w:
165+
```
166+
167+
**NOTE**
168+
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -18,7 +20,9 @@ def __init__(self, *,
1820
taskhub: str,
1921
token_credential: Optional[TokenCredential],
2022
secure_channel: bool = True,
21-
default_version: Optional[str] = None):
23+
default_version: Optional[str] = None,
24+
log_handler: Optional[logging.Handler] = None,
25+
log_formatter: Optional[logging.Formatter] = None):
2226

2327
if not taskhub:
2428
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
@@ -31,5 +35,7 @@ def __init__(self, *,
3135
host_address=host_address,
3236
secure_channel=secure_channel,
3337
metadata=None,
38+
log_handler=log_handler,
39+
log_formatter=log_formatter,
3440
interceptors=interceptors,
3541
default_version=default_version)

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -28,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
2830
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
2931
for controlling worker concurrency limits. If None, default concurrency
3032
settings will be used.
33+
log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs.
34+
log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs.
3135
3236
Raises:
3337
ValueError: If taskhub is empty or None.
@@ -52,12 +56,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
5256
parameter is set to None since authentication is handled by the
5357
DTS interceptor.
5458
"""
59+
5560
def __init__(self, *,
5661
host_address: str,
5762
taskhub: str,
5863
token_credential: Optional[TokenCredential],
5964
secure_channel: bool = True,
60-
concurrency_options: Optional[ConcurrencyOptions] = None):
65+
concurrency_options: Optional[ConcurrencyOptions] = None,
66+
log_handler: Optional[logging.Handler] = None,
67+
log_formatter: Optional[logging.Formatter] = None):
6168

6269
if not taskhub:
6370
raise ValueError("The taskhub value cannot be empty.")
@@ -70,5 +77,7 @@ def __init__(self, *,
7077
host_address=host_address,
7178
secure_channel=secure_channel,
7279
metadata=None,
80+
log_handler=log_handler,
81+
log_formatter=log_formatter,
7382
interceptors=interceptors,
7483
concurrency_options=concurrency_options)

durabletask-azuremanaged/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ classifiers = [
2222
"Programming Language :: Python :: 3",
2323
"License :: OSI Approved :: MIT License",
2424
]
25-
requires-python = ">=3.9"
25+
requires-python = ">=3.10"
2626
license = {file = "LICENSE"}
2727
readme = "README.md"
2828
dependencies = [

durabletask/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def lock_entities(self, entities: list[EntityInstanceId]) -> Task[EntityLock]:
201201
pass
202202

203203
@abstractmethod
204-
def call_sub_orchestrator(self, orchestrator: Orchestrator[TInput, TOutput], *,
204+
def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput], str], *,
205205
input: Optional[TInput] = None,
206206
instance_id: Optional[str] = None,
207207
retry_policy: Optional[RetryPolicy] = None,

durabletask/worker.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class TaskHubGrpcWorker:
246246
Defaults to the value from environment variables or localhost.
247247
metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with
248248
requests. Used for authentication and routing. Defaults to None.
249-
log_handler (optional): Custom logging handler for worker logs. Defaults to None.
249+
log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None.
250250
log_formatter (Optional[logging.Formatter], optional): Custom log formatter.
251251
Defaults to None.
252252
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
@@ -314,7 +314,7 @@ def __init__(
314314
*,
315315
host_address: Optional[str] = None,
316316
metadata: Optional[list[tuple[str, str]]] = None,
317-
log_handler=None,
317+
log_handler: Optional[logging.Handler] = None,
318318
log_formatter: Optional[logging.Formatter] = None,
319319
secure_channel: bool = False,
320320
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
@@ -1072,15 +1072,18 @@ def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLoc
10721072

10731073
def call_sub_orchestrator(
10741074
self,
1075-
orchestrator: task.Orchestrator[TInput, TOutput],
1075+
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
10761076
*,
10771077
input: Optional[TInput] = None,
10781078
instance_id: Optional[str] = None,
10791079
retry_policy: Optional[task.RetryPolicy] = None,
10801080
version: Optional[str] = None,
10811081
) -> task.Task[TOutput]:
10821082
id = self.next_sequence_number()
1083-
orchestrator_name = task.get_name(orchestrator)
1083+
if isinstance(orchestrator, str):
1084+
orchestrator_name = orchestrator
1085+
else:
1086+
orchestrator_name = task.get_name(orchestrator)
10841087
default_version = self._registry.versioning.default_version if self._registry.versioning else None
10851088
orchestrator_version = version if version else default_version
10861089
self.call_activity_function_helper(
@@ -1276,13 +1279,21 @@ def execute(
12761279
old_events: Sequence[pb.HistoryEvent],
12771280
new_events: Sequence[pb.HistoryEvent],
12781281
) -> ExecutionResults:
1282+
orchestration_name = "<unknown>"
1283+
orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")]
1284+
if len(orchestration_started_events) >= 1:
1285+
orchestration_name = orchestration_started_events[0].executionStarted.name
1286+
1287+
self._logger.debug(
1288+
f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..."
1289+
)
1290+
12791291
self._entity_state = OrchestrationEntityContext(instance_id)
12801292

12811293
if not new_events:
12821294
raise task.OrchestrationStateError(
12831295
"The new history event list must have at least one event in it."
12841296
)
1285-
12861297
ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state)
12871298
try:
12881299
# Rebuild local state by replaying old history into the orchestrator function
@@ -1314,13 +1325,15 @@ def execute(
13141325

13151326
except Exception as ex:
13161327
# Unhandled exceptions fail the orchestration
1328+
self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed")
13171329
ctx.set_failed(ex)
13181330

13191331
if not ctx._is_complete:
13201332
task_count = len(ctx._pending_tasks)
13211333
event_count = len(ctx._pending_events)
13221334
self._logger.info(
1323-
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
1335+
f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) "
1336+
f"and {event_count} event(s) outstanding."
13241337
)
13251338
elif (
13261339
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
@@ -1329,7 +1342,7 @@ def execute(
13291342
ctx._completion_status
13301343
)
13311344
self._logger.info(
1332-
f"{instance_id}: Orchestration completed with status: {completion_status_str}"
1345+
f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}"
13331346
)
13341347

13351348
actions = ctx.get_actions()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ classifiers = [
2121
"Programming Language :: Python :: 3",
2222
"License :: OSI Approved :: MIT License",
2323
]
24-
requires-python = ">=3.9"
24+
requires-python = ">=3.10"
2525
license = {file = "LICENSE"}
2626
readme = "README.md"
2727
dependencies = [

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
175175
assert activity_counter == 30
176176

177177

178+
def test_sub_orchestrator_by_name():
179+
sub_orchestrator_counter = 0
180+
181+
def orchestrator_child(ctx: task.OrchestrationContext, _):
182+
nonlocal sub_orchestrator_counter
183+
sub_orchestrator_counter += 1
184+
185+
def parent_orchestrator(ctx: task.OrchestrationContext, _):
186+
yield ctx.call_sub_orchestrator("orchestrator_child")
187+
188+
# Start a worker, which will connect to the sidecar in a background thread
189+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
190+
taskhub=taskhub_name, token_credential=None) as w:
191+
w.add_orchestrator(orchestrator_child)
192+
w.add_orchestrator(parent_orchestrator)
193+
w.start()
194+
195+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
196+
taskhub=taskhub_name, token_credential=None)
197+
id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=None)
198+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
199+
200+
assert state is not None
201+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
202+
assert state.failure_details is None
203+
assert sub_orchestrator_counter == 1
204+
205+
178206
def test_wait_for_multiple_external_events():
179207
def orchestrator(ctx: task.OrchestrationContext, _):
180208
a = yield ctx.wait_for_external_event('A')
@@ -267,7 +295,7 @@ def orchestrator(ctx: task.OrchestrationContext, _):
267295
# try:
268296
# state = task_hub_client.wait_for_orchestration_completion(id, timeout=3)
269297
# assert False, "Orchestration should not have completed"
270-
# except TimeoutError:
298+
# except (TimeoutError, _InactiveRpcError):
271299
# pass
272300

273301
# # Resume the orchestration and wait for it to complete

0 commit comments

Comments
 (0)