Skip to content

Commit c997236

Browse files
committed
.
1 parent 428fe03 commit c997236

File tree

1 file changed

+112
-118
lines changed

1 file changed

+112
-118
lines changed

tests/test_serialization_context.py

Lines changed: 112 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import dataclasses
55
import inspect
6+
import traceback
67
import uuid
78
from dataclasses import dataclass, field
89
from datetime import timedelta
@@ -34,7 +35,7 @@
3435
WithSerializationContext,
3536
WorkflowSerializationContext,
3637
)
37-
from temporalio.exceptions import ActivityError, ApplicationError
38+
from temporalio.exceptions import ApplicationError
3839
from temporalio.worker import Worker
3940
from temporalio.worker._workflow_instance import UnsandboxedWorkflowRunner
4041

@@ -968,10 +969,14 @@ async def run(self) -> Never:
968969
raise Exception("Unreachable")
969970

970971

972+
failure_converter_test_trace: list[TraceItem] = []
973+
974+
971975
class ContextFailureConverter(DefaultFailureConverter, WithSerializationContext):
972976
def __init__(self):
973977
super().__init__(encode_common_attributes=False)
974978
self.context: Optional[SerializationContext] = None
979+
self.trace: list[TraceItem] = failure_converter_test_trace
975980

976981
def with_context(
977982
self, context: Optional[SerializationContext]
@@ -986,82 +991,62 @@ def to_failure(
986991
payload_converter: PayloadConverter,
987992
failure: Failure,
988993
) -> None:
989-
print("🌈 to_failure")
990-
if isinstance(exception, ApplicationError) and exception.details:
991-
if isinstance(
992-
self.context,
993-
(WorkflowSerializationContext, ActivitySerializationContext),
994-
):
995-
context_type = (
996-
"workflow"
997-
if isinstance(self.context, WorkflowSerializationContext)
998-
else "activity"
999-
)
1000-
print(
1001-
f" 🌈 to_failure appending {context_type}: {exception.details}"
1002-
)
1003-
exception.details[0]["items"].append(
1004-
dataclasses.asdict(
1005-
TraceItem(
1006-
context_type=context_type,
1007-
in_workflow=workflow.in_workflow(),
1008-
method="to_failure",
1009-
context=dataclasses.asdict(self.context),
1010-
)
1011-
)
1012-
)
1013-
else:
1014-
raise TypeError(f"self.context is {type(self.context)}")
1015-
994+
print(f"🌈 to_failure: {exception.__class__}")
995+
if isinstance(self.context, WorkflowSerializationContext):
996+
context_type = "workflow"
997+
elif isinstance(self.context, ActivitySerializationContext):
998+
context_type = "activity"
999+
else:
1000+
raise TypeError(f"self.context is {type(self.context)}")
1001+
1002+
self.trace.append(
1003+
TraceItem(
1004+
context_type=context_type,
1005+
in_workflow=workflow.in_workflow(),
1006+
method="to_failure",
1007+
context=dataclasses.asdict(self.context),
1008+
)
1009+
)
10161010
super().to_failure(exception, payload_converter, failure)
10171011

10181012
def from_failure(
10191013
self, failure: Failure, payload_converter: PayloadConverter
10201014
) -> BaseException:
10211015
print("🌈 from_failure")
1016+
print("\n".join(list(reversed(traceback.format_stack()))[:5]))
10221017
# Let the base class create the exception
1023-
exception = super().from_failure(failure, payload_converter)
1024-
print(f" 🌈 {exception.__class__}")
1025-
if isinstance(exception, ApplicationError) and exception.details:
1026-
if isinstance(
1027-
self.context,
1028-
(WorkflowSerializationContext, ActivitySerializationContext),
1029-
):
1030-
context_type = (
1031-
"workflow"
1032-
if isinstance(self.context, WorkflowSerializationContext)
1033-
else "activity"
1034-
)
1035-
print(
1036-
f" 🌈 from_failure appending {context_type}: {exception.details}"
1037-
)
1038-
exception.details[0]["items"].append(
1039-
dataclasses.asdict(
1040-
TraceItem(
1041-
context_type=context_type,
1042-
in_workflow=workflow.in_workflow(),
1043-
method="from_failure",
1044-
context=dataclasses.asdict(self.context),
1045-
)
1046-
)
1047-
)
1048-
else:
1049-
raise TypeError(f"self.context is {type(self.context)}")
1050-
return exception
1018+
if isinstance(self.context, WorkflowSerializationContext):
1019+
context_type = "workflow"
1020+
elif isinstance(self.context, ActivitySerializationContext):
1021+
context_type = "activity"
1022+
else:
1023+
raise TypeError(f"self.context is {type(self.context)}")
1024+
1025+
self.trace.append(
1026+
TraceItem(
1027+
context_type=context_type,
1028+
in_workflow=workflow.in_workflow(),
1029+
method="from_failure",
1030+
context=dataclasses.asdict(self.context),
1031+
# caller_location=get_caller_location(),
1032+
)
1033+
)
1034+
return super().from_failure(failure, payload_converter)
10511035

10521036

1053-
async def test_failure_conversion_with_context(client: Client):
1037+
async def test_failure_converter_with_context(client: Client):
10541038
print()
10551039
workflow_id = str(uuid.uuid4())
10561040
task_queue = str(uuid.uuid4())
10571041

1042+
data_converter = dataclasses.replace(
1043+
DataConverter.default,
1044+
failure_converter_class=ContextFailureConverter,
1045+
)
10581046
test_client = Client(
10591047
client.service_client,
10601048
namespace=client.namespace,
1061-
data_converter=dataclasses.replace(
1062-
DataConverter.default,
1063-
failure_converter_class=ContextFailureConverter,
1064-
),
1049+
data_converter=data_converter,
10651050
)
10661051
async with Worker(
10671052
test_client,
@@ -1076,67 +1061,76 @@ async def test_failure_conversion_with_context(client: Client):
10761061
id=workflow_id,
10771062
task_queue=task_queue,
10781063
)
1079-
except Exception as err:
1080-
assert isinstance(err, WorkflowFailureError)
1081-
assert isinstance(err.cause, ActivityError)
1082-
assert isinstance(err.cause.cause, ApplicationError)
1083-
pprint(err.cause.cause.details)
1084-
1085-
workflow_context = dataclasses.asdict(
1086-
WorkflowSerializationContext(
1087-
namespace="default",
1088-
workflow_id=workflow_id,
1089-
)
1064+
raise AssertionError("unreachable")
1065+
except WorkflowFailureError:
1066+
pass
1067+
1068+
assert isinstance(data_converter.failure_converter, ContextFailureConverter)
1069+
1070+
workflow_context = dataclasses.asdict(
1071+
WorkflowSerializationContext(
1072+
namespace="default",
1073+
workflow_id=workflow_id,
10901074
)
1091-
activity_context = dataclasses.asdict(
1092-
ActivitySerializationContext(
1093-
namespace="default",
1094-
workflow_id=workflow_id,
1095-
workflow_type="FailureContextWorkflow",
1096-
activity_type="failing_activity",
1097-
activity_task_queue=task_queue,
1098-
is_local=False,
1099-
)
1075+
)
1076+
activity_context = dataclasses.asdict(
1077+
ActivitySerializationContext(
1078+
namespace="default",
1079+
workflow_id=workflow_id,
1080+
workflow_type="FailureContextWorkflow",
1081+
activity_type="failing_activity",
1082+
activity_task_queue=task_queue,
1083+
is_local=False,
11001084
)
1101-
# 1. Exception raised in activity
1102-
# 2. outbound activity result to_failure(act, activity_ctx) appends and serializes
1103-
# 3. -> server -> WFT -> WF
1104-
# 4. inbound activity result from_failure(wf, activity_ctx, in_wf=False) deserializes and appends
1105-
# 5. outbound wf result to_failure(wf, in_wf=True) appends and serializes
1106-
# 6. inbound wf result from_failure(client, wf_context, in_wf=False)
1107-
if False:
1108-
assert_trace(
1109-
err.cause.cause.details,
1085+
)
1086+
# 1. Exception raised in activity
1087+
# 2. outbound activity result to_failure(act, activity_ctx) appends and serializes
1088+
# 3. -> server -> WFT -> WF
1089+
# 4. inbound activity result from_failure(wf, activity_ctx, in_wf=False) deserializes and appends
1090+
# 5. outbound wf result to_failure(wf, in_wf=True) appends and serializes
1091+
# 6. inbound wf result from_failure(client, wf_context, in_wf=False)
1092+
if True:
1093+
assert_trace(
1094+
data_converter.failure_converter.trace,
1095+
[
1096+
TraceItem(
1097+
context_type="activity",
1098+
context=activity_context,
1099+
in_workflow=False,
1100+
method="to_failure", # outbound activity result
1101+
)
1102+
]
1103+
+ (
11101104
[
1111-
dataclasses.asdict(d)
1112-
for d in [
1113-
TraceItem(
1114-
context_type="activity",
1115-
context=activity_context,
1116-
in_workflow=False,
1117-
method="to_failure", # outbound activity result
1118-
),
1119-
TraceItem(
1120-
context_type="activity",
1121-
context=activity_context,
1122-
in_workflow=False,
1123-
method="from_failure", # inbound activity result
1124-
),
1125-
TraceItem(
1126-
context_type="workflow",
1127-
context=workflow_context,
1128-
in_workflow=True,
1129-
method="to_failure", # outbound workflow result
1130-
),
1131-
TraceItem(
1132-
context_type="workflow",
1133-
context=workflow_context,
1134-
in_workflow=False,
1135-
method="from_failure", # inbound workflow result
1136-
),
1137-
]
1138-
],
1105+
TraceItem(
1106+
context_type="activity",
1107+
context=activity_context,
1108+
in_workflow=False,
1109+
method="from_failure", # inbound activity result
1110+
)
1111+
]
1112+
* 2 # from_failure deserializes the error and error cause
11391113
)
1114+
+ [
1115+
TraceItem(
1116+
context_type="workflow",
1117+
context=workflow_context,
1118+
in_workflow=True,
1119+
method="to_failure", # outbound workflow result
1120+
)
1121+
]
1122+
+ (
1123+
[
1124+
TraceItem(
1125+
context_type="workflow",
1126+
context=workflow_context,
1127+
in_workflow=False,
1128+
method="from_failure", # inbound workflow result
1129+
)
1130+
]
1131+
* 2 # from_failure deserializes the error and error cause
1132+
),
1133+
)
11401134

11411135

11421136
class ContextCodec(PayloadCodec, WithSerializationContext):

0 commit comments

Comments
 (0)