Skip to content

Commit 0e95aa8

Browse files
andystaplesCopilotberndverst
authored
Add/increase execution logging (#70)
* Add/increase execution logging * Update durabletask/worker.py Co-authored-by: Copilot <[email protected]> * Changes - Set level to debug for execution logs - Allow logging config in durabletask-python - Update docs for logging * Revert one more log line * Type-hint log_handler --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Bernd Verst <[email protected]>
1 parent f5de04f commit 0e95aa8

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

docs/features.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,21 @@ Orchestrations can be suspended using the `suspend_orchestration` client API and
148148

149149
### Retry policies
150150

151-
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
151+
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
152+
153+
### Logging configuration
154+
155+
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and DurableTaskSchedulerClient for durabletask-azuremanaged) accept a log_handler and log_formatter object from `logging`. These can be used to customize verbosity, output location, and format of logs emitted by these sources.
156+
157+
For example, to output logs to a file called `worker.log` at level `DEBUG`, the following syntax might apply:
158+
159+
```python
160+
log_handler = logging.FileHandler('durable.log', encoding='utf-8')
161+
log_handler.setLevel(logging.DEBUG)
162+
163+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
164+
taskhub=taskhub_name, token_credential=credential, log_handler=log_handler) as w:
165+
```
166+
167+
**NOTE**
168+
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -18,7 +20,9 @@ def __init__(self, *,
1820
taskhub: str,
1921
token_credential: Optional[TokenCredential],
2022
secure_channel: bool = True,
21-
default_version: Optional[str] = None):
23+
default_version: Optional[str] = None,
24+
log_handler: Optional[logging.Handler] = None,
25+
log_formatter: Optional[logging.Formatter] = None):
2226

2327
if not taskhub:
2428
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
@@ -31,5 +35,7 @@ def __init__(self, *,
3135
host_address=host_address,
3236
secure_channel=secure_channel,
3337
metadata=None,
38+
log_handler=log_handler,
39+
log_formatter=log_formatter,
3440
interceptors=interceptors,
3541
default_version=default_version)

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import logging
5+
46
from typing import Optional
57

68
from azure.core.credentials import TokenCredential
@@ -28,6 +30,8 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
2830
concurrency_options (Optional[ConcurrencyOptions], optional): Configuration
2931
for controlling worker concurrency limits. If None, default concurrency
3032
settings will be used.
33+
log_handler (Optional[logging.Handler], optional): Custom logging handler for worker logs.
34+
log_formatter (Optional[logging.Formatter], optional): Custom log formatter for worker logs.
3135
3236
Raises:
3337
ValueError: If taskhub is empty or None.
@@ -52,12 +56,15 @@ class DurableTaskSchedulerWorker(TaskHubGrpcWorker):
5256
parameter is set to None since authentication is handled by the
5357
DTS interceptor.
5458
"""
59+
5560
def __init__(self, *,
5661
host_address: str,
5762
taskhub: str,
5863
token_credential: Optional[TokenCredential],
5964
secure_channel: bool = True,
60-
concurrency_options: Optional[ConcurrencyOptions] = None):
65+
concurrency_options: Optional[ConcurrencyOptions] = None,
66+
log_handler: Optional[logging.Handler] = None,
67+
log_formatter: Optional[logging.Formatter] = None):
6168

6269
if not taskhub:
6370
raise ValueError("The taskhub value cannot be empty.")
@@ -70,5 +77,7 @@ def __init__(self, *,
7077
host_address=host_address,
7178
secure_channel=secure_channel,
7279
metadata=None,
80+
log_handler=log_handler,
81+
log_formatter=log_formatter,
7382
interceptors=interceptors,
7483
concurrency_options=concurrency_options)

durabletask/worker.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class TaskHubGrpcWorker:
246246
Defaults to the value from environment variables or localhost.
247247
metadata (Optional[list[tuple[str, str]]], optional): gRPC metadata to include with
248248
requests. Used for authentication and routing. Defaults to None.
249-
log_handler (optional): Custom logging handler for worker logs. Defaults to None.
249+
log_handler (optional[logging.Handler]): Custom logging handler for worker logs. Defaults to None.
250250
log_formatter (Optional[logging.Formatter], optional): Custom log formatter.
251251
Defaults to None.
252252
secure_channel (bool, optional): Whether to use a secure gRPC channel (TLS).
@@ -314,7 +314,7 @@ def __init__(
314314
*,
315315
host_address: Optional[str] = None,
316316
metadata: Optional[list[tuple[str, str]]] = None,
317-
log_handler=None,
317+
log_handler: Optional[logging.Handler] = None,
318318
log_formatter: Optional[logging.Formatter] = None,
319319
secure_channel: bool = False,
320320
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
@@ -1236,13 +1236,21 @@ def execute(
12361236
old_events: Sequence[pb.HistoryEvent],
12371237
new_events: Sequence[pb.HistoryEvent],
12381238
) -> ExecutionResults:
1239+
orchestration_name = "<unknown>"
1240+
orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")]
1241+
if len(orchestration_started_events) >= 1:
1242+
orchestration_name = orchestration_started_events[0].executionStarted.name
1243+
1244+
self._logger.debug(
1245+
f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..."
1246+
)
1247+
12391248
self._entity_state = OrchestrationEntityContext(instance_id)
12401249

12411250
if not new_events:
12421251
raise task.OrchestrationStateError(
12431252
"The new history event list must have at least one event in it."
12441253
)
1245-
12461254
ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state)
12471255
try:
12481256
# Rebuild local state by replaying old history into the orchestrator function
@@ -1274,13 +1282,15 @@ def execute(
12741282

12751283
except Exception as ex:
12761284
# Unhandled exceptions fail the orchestration
1285+
self._logger.debug(f"{instance_id}: Orchestration {orchestration_name} failed")
12771286
ctx.set_failed(ex)
12781287

12791288
if not ctx._is_complete:
12801289
task_count = len(ctx._pending_tasks)
12811290
event_count = len(ctx._pending_events)
12821291
self._logger.info(
1283-
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
1292+
f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) "
1293+
f"and {event_count} event(s) outstanding."
12841294
)
12851295
elif (
12861296
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
@@ -1289,7 +1299,7 @@ def execute(
12891299
ctx._completion_status
12901300
)
12911301
self._logger.info(
1292-
f"{instance_id}: Orchestration completed with status: {completion_status_str}"
1302+
f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}"
12931303
)
12941304

12951305
actions = ctx.get_actions()

0 commit comments

Comments
 (0)