Skip to content

Commit e4969c0

Browse files
romank0tconley1428
andcommitted
Adds proper failure decoding in workflow activation (#1192)
* Adds proper failure decoding in workflow activation * fixes formatting * Update generator to include cause --------- Co-authored-by: Tim Conley <[email protected]> Co-authored-by: tconley1428 <[email protected]>
1 parent 37e8975 commit e4969c0

File tree

3 files changed

+58
-5
lines changed

3 files changed

+58
-5
lines changed

scripts/gen_payload_visitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ def walk(self, desc: Descriptor) -> bool:
171171
if key in self.generated:
172172
return self.generated[key]
173173
if key in self.in_progress:
174-
# Break cycles; if another path proves this node needed, we'll revisit
175-
return False
174+
# Break cycles; Assume the child will be needed (Used by Failure -> Cause)
175+
return True
176176

177177
has_payload = False
178178
self.in_progress.add(key)

temporalio/bridge/_visitor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ async def _visit_temporal_api_failure_v1_ResetWorkflowFailureInfo(self, fs, o):
7474
async def _visit_temporal_api_failure_v1_Failure(self, fs, o):
7575
if o.HasField("encoded_attributes"):
7676
await self._visit_temporal_api_common_v1_Payload(fs, o.encoded_attributes)
77+
if o.HasField("cause"):
78+
await self._visit_temporal_api_failure_v1_Failure(fs, o.cause)
7779
if o.HasField("application_failure_info"):
7880
await self._visit_temporal_api_failure_v1_ApplicationFailureInfo(
7981
fs, o.application_failure_info

tests/worker/test_workflow.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
import typing
1616
import uuid
1717
from abc import ABC, abstractmethod
18-
from contextlib import contextmanager
1918
from dataclasses import dataclass
2019
from datetime import datetime, timedelta, timezone
2120
from enum import IntEnum
2221
from functools import partial
2322
from typing import (
2423
Any,
2524
Awaitable,
26-
Callable,
2725
Dict,
2826
List,
2927
Mapping,
@@ -8466,7 +8464,7 @@ async def test_search_attribute_codec(client: Client, env_type: str):
84668464
result = await client.execute_workflow(
84678465
SearchAttributeCodecParentWorkflow.run,
84688466
"Temporal",
8469-
id=f"encryption-workflow-id",
8467+
id="encryption-workflow-id",
84708468
task_queue=worker.task_queue,
84718469
search_attributes=TypedSearchAttributes(
84728470
[
@@ -8476,3 +8474,56 @@ async def test_search_attribute_codec(client: Client, env_type: str):
84768474
]
84778475
),
84788476
)
8477+
8478+
8479+
@activity.defn
8480+
async def activity_that_fails_with_details() -> str:
8481+
"""Activity that raises an ApplicationError with custom details."""
8482+
raise ApplicationError(
8483+
"Activity failed intentionally",
8484+
"detail1",
8485+
{"error_code": "NOT_FOUND", "id": "test-123"},
8486+
non_retryable=True,
8487+
)
8488+
8489+
8490+
@workflow.defn
8491+
class WorkflowWithFailingActivityAndCodec:
8492+
@workflow.run
8493+
async def run(self) -> str:
8494+
try:
8495+
return await workflow.execute_activity(
8496+
activity_that_fails_with_details,
8497+
schedule_to_close_timeout=timedelta(seconds=3),
8498+
retry_policy=RetryPolicy(maximum_attempts=1),
8499+
)
8500+
except ActivityError as err:
8501+
assert isinstance(err.cause, ApplicationError)
8502+
assert err.cause.message == "Activity failed intentionally"
8503+
assert len(err.cause.details) == 2
8504+
assert err.cause.details[0] == "detail1"
8505+
assert err.cause.details[1] == {"error_code": "NOT_FOUND", "id": "test-123"}
8506+
return "Handled encrypted failure successfully"
8507+
8508+
8509+
async def test_activity_failure_with_encoded_payload_is_decoded_in_workflow(
8510+
client: Client,
8511+
):
8512+
config = client.config()
8513+
config["data_converter"] = dataclasses.replace(
8514+
temporalio.converter.default(), payload_codec=EncryptionCodec()
8515+
)
8516+
client = Client(**config)
8517+
8518+
async with new_worker(
8519+
client,
8520+
WorkflowWithFailingActivityAndCodec,
8521+
activities=[activity_that_fails_with_details],
8522+
) as worker:
8523+
result = await client.execute_workflow(
8524+
WorkflowWithFailingActivityAndCodec.run,
8525+
id=f"workflow-{uuid.uuid4()}",
8526+
task_queue=worker.task_queue,
8527+
run_timeout=timedelta(seconds=5),
8528+
)
8529+
assert result == "Handled encrypted failure successfully"

0 commit comments

Comments
 (0)