Skip to content

Commit d5744f9

Browse files
tconley1428cretz
andcommitted
Adding trace identification numbers (#888)
* Adding trace identification numbers to select log statements to allow for easier categorization in log handlers * Reformat * Fix import formatting * Remove key and enum definitions * Fix copy paste bug * Switch workflow test to logcapturer * Remove test artifact * Fixing test * Update temporalio/types.py Co-authored-by: Chad Retz <[email protected]> --------- Co-authored-by: Chad Retz <[email protected]>
1 parent 1ac0a31 commit d5744f9

File tree

4 files changed

+81
-4
lines changed

4 files changed

+81
-4
lines changed

temporalio/worker/_activity.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,11 +353,15 @@ async def _run_activity(
353353
):
354354
# Downgrade log level to DEBUG for BENIGN application errors.
355355
temporalio.activity.logger.debug(
356-
"Completing activity as failed", exc_info=True
356+
"Completing activity as failed",
357+
exc_info=True,
358+
extra={"__temporal_error_identifier": "ActivityFailure"},
357359
)
358360
else:
359361
temporalio.activity.logger.warning(
360-
"Completing activity as failed", exc_info=True
362+
"Completing activity as failed",
363+
exc_info=True,
364+
extra={"__temporal_error_identifier": "ActivityFailure"},
361365
)
362366
await self._data_converter.encode_failure(
363367
err, completion.result.failed.failure

temporalio/worker/_workflow_instance.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,10 @@ def activate(
447447
logger.warning(
448448
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
449449
exc_info=activation_err,
450-
extra={"temporal_workflow": self._info._logger_details()},
450+
extra={
451+
"temporal_workflow": self._info._logger_details(),
452+
"__temporal_error_identifier": "WorkflowTaskFailure",
453+
},
451454
)
452455
# Set completion failure
453456
self._current_completion.failed.failure.SetInParent()

tests/worker/test_activity.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,43 @@ def assert_activity_application_error(
13821382
assert isinstance(ret, ApplicationError)
13831383
return ret
13841384

1385+
1386+
class CustomLogHandler(logging.Handler):
1387+
def __init__(self):
1388+
super().__init__()
1389+
self._trace_identifiers = 0
1390+
1391+
def emit(self, record: logging.LogRecord) -> None:
1392+
if (
1393+
hasattr(record, "__temporal_error_identifier")
1394+
and getattr(record, "__temporal_error_identifier") == "ActivityFailure"
1395+
):
1396+
assert record.msg.startswith("Completing activity as failed")
1397+
self._trace_identifiers += 1
1398+
return None
1399+
1400+
1401+
async def test_activity_failure_trace_identifier(
1402+
client: Client, worker: ExternalWorker
1403+
):
1404+
@activity.defn
1405+
async def raise_error():
1406+
raise RuntimeError("oh no!")
1407+
1408+
handler = CustomLogHandler()
1409+
activity.logger.base_logger.addHandler(handler)
1410+
1411+
try:
1412+
with pytest.raises(WorkflowFailureError) as err:
1413+
await _execute_workflow_with_activity(client, worker, raise_error)
1414+
assert (
1415+
str(assert_activity_application_error(err.value)) == "RuntimeError: oh no!"
1416+
)
1417+
assert handler._trace_identifiers == 1
1418+
1419+
finally:
1420+
activity.logger.base_logger.removeHandler(CustomLogHandler())
1421+
13851422
async def test_activity_reset(client: Client, worker: ExternalWorker):
13861423

13871424
@activity.defn

tests/worker/test_workflow.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1961,8 +1961,13 @@ def logs_captured(self, *loggers: logging.Logger):
19611961
l.setLevel(prev_levels[i])
19621962

19631963
def find_log(self, starts_with: str) -> Optional[logging.LogRecord]:
1964+
return self.find(lambda l: l.message.startswith(starts_with))
1965+
1966+
def find(
1967+
self, pred: Callable[[logging.LogRecord], bool]
1968+
) -> Optional[logging.LogRecord]:
19641969
for record in cast(List[logging.LogRecord], self.log_queue.queue):
1965-
if record.message.startswith(starts_with):
1970+
if pred(record):
19661971
return record
19671972
return None
19681973

@@ -2058,6 +2063,7 @@ async def run(self) -> None:
20582063
if not task_fail_once_workflow_has_failed:
20592064
task_fail_once_workflow_has_failed = True
20602065
raise RuntimeError("Intentional workflow task failure")
2066+
task_fail_once_workflow_has_failed = False
20612067

20622068
# Execute activity that will fail once
20632069
await workflow.execute_activity(
@@ -7975,6 +7981,33 @@ async def test_quick_activity_swallows_cancellation(client: Client):
79757981
temporalio.worker._workflow_instance._raise_on_cancelling_completed_activity_override = False
79767982

79777983

7984+
async def test_workflow_logging_trace_identifier(client: Client):
7985+
with LogCapturer().logs_captured(
7986+
temporalio.worker._workflow_instance.logger
7987+
) as capturer:
7988+
async with new_worker(
7989+
client,
7990+
TaskFailOnceWorkflow,
7991+
activities=[task_fail_once_activity],
7992+
) as worker:
7993+
await client.execute_workflow(
7994+
TaskFailOnceWorkflow.run,
7995+
id=f"workflow_failure_trace_identifier",
7996+
task_queue=worker.task_queue,
7997+
)
7998+
7999+
def workflow_failure(l: logging.LogRecord):
8000+
if (
8001+
hasattr(l, "__temporal_error_identifier")
8002+
and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure"
8003+
):
8004+
assert l.msg.startswith("Failed activation on workflow")
8005+
return True
8006+
return False
8007+
8008+
assert capturer.find(workflow_failure) is not None
8009+
8010+
79788011
@activity.defn
79798012
def use_in_workflow() -> bool:
79808013
return workflow.in_workflow()

0 commit comments

Comments
 (0)