Skip to content

Commit cd9e47e

Browse files
committed
Add/increase execution logging
1 parent eba1e8e commit cd9e47e

File tree

1 file changed

+17
-7
lines changed

1 file changed

+17
-7
lines changed

durabletask/worker.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,13 +1233,21 @@ def execute(
12331233
old_events: Sequence[pb.HistoryEvent],
12341234
new_events: Sequence[pb.HistoryEvent],
12351235
) -> ExecutionResults:
1236+
orchestration_name = "<unknown>"
1237+
orchestration_started_events = [e for e in old_events if e.HasField("executionStarted")]
1238+
if len(orchestration_started_events) > 1:
1239+
orchestration_name = orchestration_started_events[0].executionStarted.name
1240+
1241+
self._logger.info(
1242+
f"{instance_id}: Beginning replay for orchestrator {orchestration_name}..."
1243+
)
1244+
12361245
self._entity_state = OrchestrationEntityContext(instance_id)
12371246

12381247
if not new_events:
12391248
raise task.OrchestrationStateError(
12401249
"The new history event list must have at least one event in it."
12411250
)
1242-
12431251
ctx = _RuntimeOrchestrationContext(instance_id, self._registry, self._entity_state)
12441252
try:
12451253
# Rebuild local state by replaying old history into the orchestrator function
@@ -1271,13 +1279,15 @@ def execute(
12711279

12721280
except Exception as ex:
12731281
# Unhandled exceptions fail the orchestration
1282+
self._logger.info(f"{instance_id}: Orchestration {orchestration_name} failed")
12741283
ctx.set_failed(ex)
12751284

12761285
if not ctx._is_complete:
12771286
task_count = len(ctx._pending_tasks)
12781287
event_count = len(ctx._pending_events)
12791288
self._logger.info(
1280-
f"{instance_id}: Orchestrator yielded with {task_count} task(s) and {event_count} event(s) outstanding."
1289+
f"{instance_id}: Orchestrator {orchestration_name} yielded with {task_count} task(s) "
1290+
f"and {event_count} event(s) outstanding."
12811291
)
12821292
elif (
12831293
ctx._completion_status and ctx._completion_status is not pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
@@ -1286,7 +1296,7 @@ def execute(
12861296
ctx._completion_status
12871297
)
12881298
self._logger.info(
1289-
f"{instance_id}: Orchestration completed with status: {completion_status_str}"
1299+
f"{instance_id}: Orchestration {orchestration_name} completed with status: {completion_status_str}"
12901300
)
12911301

12921302
actions = ctx.get_actions()
@@ -1754,7 +1764,7 @@ def execute(
17541764
encoded_input: Optional[str],
17551765
) -> Optional[str]:
17561766
"""Executes an activity function and returns the serialized result, if any."""
1757-
self._logger.debug(
1767+
self._logger.info(
17581768
f"{orchestration_id}/{task_id}: Executing activity '{name}'..."
17591769
)
17601770
fn = self._registry.get_activity(name)
@@ -1773,7 +1783,7 @@ def execute(
17731783
shared.to_json(activity_output) if activity_output is not None else None
17741784
)
17751785
chars = len(encoded_output) if encoded_output else 0
1776-
self._logger.debug(
1786+
self._logger.info(
17771787
f"{orchestration_id}/{task_id}: Activity '{name}' completed successfully with {chars} char(s) of encoded output."
17781788
)
17791789
return encoded_output
@@ -1793,7 +1803,7 @@ def execute(
17931803
encoded_input: Optional[str],
17941804
) -> Optional[str]:
17951805
"""Executes an entity function and returns the serialized result, if any."""
1796-
self._logger.debug(
1806+
self._logger.info(
17971807
f"{orchestration_id}: Executing entity '{entity_id}'..."
17981808
)
17991809
fn = self._registry.get_entity(entity_id.entity)
@@ -1827,7 +1837,7 @@ def execute(
18271837
shared.to_json(entity_output) if entity_output is not None else None
18281838
)
18291839
chars = len(encoded_output) if encoded_output else 0
1830-
self._logger.debug(
1840+
self._logger.info(
18311841
f"{orchestration_id}: Entity '{entity_id}' completed successfully with {chars} char(s) of encoded output."
18321842
)
18331843
return encoded_output

0 commit comments

Comments
 (0)