Skip to content

Commit 231cc67

Browse files
romank0tconley1428
andauthored
Adds proper failure decoding in workflow activation (#1192)
* Adds proper failure decoding in workflow activation * fixes formatting * Update generator to include cause --------- Co-authored-by: Tim Conley <[email protected]> Co-authored-by: tconley1428 <[email protected]>
1 parent e39e8f2 commit 231cc67

File tree

3 files changed

+58
-5
lines changed

3 files changed

+58
-5
lines changed

scripts/gen_payload_visitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ def walk(self, desc: Descriptor) -> bool:
172172
if key in self.generated:
173173
return self.generated[key]
174174
if key in self.in_progress:
175-
# Break cycles; if another path proves this node needed, we'll revisit
176-
return False
175+
# Break cycles; Assume the child will be needed (Used by Failure -> Cause)
176+
return True
177177

178178
has_payload = False
179179
self.in_progress.add(key)

temporalio/bridge/_visitor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ async def _visit_temporal_api_failure_v1_ResetWorkflowFailureInfo(self, fs, o):
7474
async def _visit_temporal_api_failure_v1_Failure(self, fs, o):
7575
if o.HasField("encoded_attributes"):
7676
await self._visit_temporal_api_common_v1_Payload(fs, o.encoded_attributes)
77+
if o.HasField("cause"):
78+
await self._visit_temporal_api_failure_v1_Failure(fs, o.cause)
7779
if o.HasField("application_failure_info"):
7880
await self._visit_temporal_api_failure_v1_ApplicationFailureInfo(
7981
fs, o.application_failure_info

tests/worker/test_workflow.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
import typing
1616
import uuid
1717
from abc import ABC, abstractmethod
18-
from contextlib import contextmanager
1918
from dataclasses import dataclass
2019
from datetime import datetime, timedelta, timezone
2120
from enum import IntEnum
2221
from functools import partial
2322
from typing import (
2423
Any,
2524
Awaitable,
26-
Callable,
2725
Dict,
2826
List,
2927
Mapping,
@@ -8383,7 +8381,7 @@ async def test_search_attribute_codec(client: Client, env_type: str):
83838381
result = await client.execute_workflow(
83848382
SearchAttributeCodecParentWorkflow.run,
83858383
"Temporal",
8386-
id=f"encryption-workflow-id",
8384+
id="encryption-workflow-id",
83878385
task_queue=worker.task_queue,
83888386
search_attributes=TypedSearchAttributes(
83898387
[
@@ -8393,3 +8391,56 @@ async def test_search_attribute_codec(client: Client, env_type: str):
83938391
]
83948392
),
83958393
)
8394+
8395+
8396+
@activity.defn
8397+
async def activity_that_fails_with_details() -> str:
8398+
"""Activity that raises an ApplicationError with custom details."""
8399+
raise ApplicationError(
8400+
"Activity failed intentionally",
8401+
"detail1",
8402+
{"error_code": "NOT_FOUND", "id": "test-123"},
8403+
non_retryable=True,
8404+
)
8405+
8406+
8407+
@workflow.defn
8408+
class WorkflowWithFailingActivityAndCodec:
8409+
@workflow.run
8410+
async def run(self) -> str:
8411+
try:
8412+
return await workflow.execute_activity(
8413+
activity_that_fails_with_details,
8414+
schedule_to_close_timeout=timedelta(seconds=3),
8415+
retry_policy=RetryPolicy(maximum_attempts=1),
8416+
)
8417+
except ActivityError as err:
8418+
assert isinstance(err.cause, ApplicationError)
8419+
assert err.cause.message == "Activity failed intentionally"
8420+
assert len(err.cause.details) == 2
8421+
assert err.cause.details[0] == "detail1"
8422+
assert err.cause.details[1] == {"error_code": "NOT_FOUND", "id": "test-123"}
8423+
return "Handled encrypted failure successfully"
8424+
8425+
8426+
async def test_activity_failure_with_encoded_payload_is_decoded_in_workflow(
8427+
client: Client,
8428+
):
8429+
config = client.config()
8430+
config["data_converter"] = dataclasses.replace(
8431+
temporalio.converter.default(), payload_codec=EncryptionCodec()
8432+
)
8433+
client = Client(**config)
8434+
8435+
async with new_worker(
8436+
client,
8437+
WorkflowWithFailingActivityAndCodec,
8438+
activities=[activity_that_fails_with_details],
8439+
) as worker:
8440+
result = await client.execute_workflow(
8441+
WorkflowWithFailingActivityAndCodec.run,
8442+
id=f"workflow-{uuid.uuid4()}",
8443+
task_queue=worker.task_queue,
8444+
run_timeout=timedelta(seconds=5),
8445+
)
8446+
assert result == "Handled encrypted failure successfully"

0 commit comments

Comments
 (0)