Skip to content

Commit 3036a8a

Browse files
committed
Rename options and move disablement to worker
1 parent 2efbb2c commit 3036a8a

File tree

3 files changed

+71
-58
lines changed

3 files changed

+71
-58
lines changed

temporalio/converter.py

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,43 +1209,23 @@ def __init__(self) -> None:
12091209

12101210
@dataclass(frozen=True)
12111211
class PayloadLimitsConfig:
1212-
"""Configuration for when uploaded payload sizes exceed the Temporal server's limits."""
1212+
"""Configuration for when payload sizes exceed limits."""
12131213

1214-
memo_upload_error_disabled: bool = False
1215-
"""Field indiciating that the memo size checks should be disabled in the SDK.
1216-
1217-
A value of False will cause the SDK to fail tasks that attempt to upload memos
1218-
with a size that is over the Temporal server memo limit. A value of True will
1219-
disable memo size checks in the SDK, allowing it to attempt to upload memos
1220-
even if their size is over the Temporal server limit.
1221-
1222-
The default value is False."""
1223-
1224-
memo_upload_warning_limit: int = 2 * 1024
1214+
memo_size_warning: int = 2 * 1024
12251215
"""The limit (in bytes) at which a memo size warning is logged."""
12261216

1227-
payload_upload_error_disabled: bool = False
1228-
"""Field indiciating that the payload size checks should be disabled in the SDK.
1229-
1230-
A value of False will cause the SDK to fail tasks that attempt to upload payloads
1231-
with a size that is over the Temporal server payloads limit. A value of True will
1232-
disable payload size checks in the SDK, allowing it to attempt to upload payloads
1233-
even if their size is over the Temporal server limit.
1234-
1235-
The default value is False."""
1236-
1237-
payload_upload_warning_limit: int = 512 * 1024
1217+
payload_size_warning: int = 512 * 1024
12381218
"""The limit (in bytes) at which a payload size warning is logged."""
12391219

12401220

12411221
class PayloadSizeWarning(RuntimeWarning):
1242-
"""The size of encoded payloads is above the warning limit."""
1222+
"""The size of payloads is above the warning limit."""
12431223

12441224

12451225
@dataclass
12461226
class _PayloadErrorLimits:
1247-
memo_upload_error_limit: int
1248-
payload_upload_error_limit: int
1227+
memo_size_error: int
1228+
payload_size_error: int
12491229

12501230

12511231
@dataclass(frozen=True)
@@ -1277,9 +1257,9 @@ class DataConverter(WithSerializationContext):
12771257
default: ClassVar[DataConverter]
12781258
"""Singleton default data converter."""
12791259

1280-
_memo_upload_error_limit: int = 0
1260+
_memo_size_error_limit: int = 0
12811261

1282-
_payload_upload_error_limit: int = 0
1262+
_payload_size_error_limit: int = 0
12831263

12841264
def __post_init__(self) -> None: # noqa: D105
12851265
object.__setattr__(self, "payload_converter", self.payload_converter_class())
@@ -1385,12 +1365,8 @@ def with_context(self, context: SerializationContext) -> Self:
13851365
def _with_payload_error_limits(self, options: _PayloadErrorLimits) -> DataConverter:
13861366
return dataclasses.replace(
13871367
self,
1388-
_memo_upload_error_limit=0
1389-
if self.payload_limits.memo_upload_error_disabled
1390-
else options.memo_upload_error_limit,
1391-
_payload_upload_error_limit=0
1392-
if self.payload_limits.payload_upload_error_disabled
1393-
else options.payload_upload_error_limit,
1368+
_memo_size_error_limit=options.memo_size_error,
1369+
_payload_size_error_limit=options.payload_size_error,
13941370
)
13951371

13961372
async def _decode_memo(
@@ -1436,9 +1412,9 @@ async def _encode_memo_existing(
14361412
# Memos have their field payloads validated all together in one unit
14371413
DataConverter._validate_limits(
14381414
payloads,
1439-
self._memo_upload_error_limit,
1415+
self._memo_size_error_limit,
14401416
"[TMPRL1103] Attempted to upload memo with size that exceeded the error limit.",
1441-
self.payload_limits.memo_upload_warning_limit,
1417+
self.payload_limits.memo_size_warning,
14421418
"[TMPRL1103] Attempted to upload memo with size that exceeded the warning limit.",
14431419
)
14441420

@@ -1525,9 +1501,9 @@ def _validate_payload_limits(
15251501
):
15261502
DataConverter._validate_limits(
15271503
payloads,
1528-
self._payload_upload_error_limit,
1504+
self._payload_size_error_limit,
15291505
"[TMPRL1103] Attempted to upload payloads with size that exceeded the error limit.",
1530-
self.payload_limits.payload_upload_warning_limit,
1506+
self.payload_limits.payload_size_warning,
15311507
"[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit.",
15321508
)
15331509

temporalio/worker/_worker.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def __init__(
141141
nexus_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
142142
maximum=5
143143
),
144+
disable_payload_error_limit: bool = False,
144145
) -> None:
145146
"""Create a worker to process workflows and/or activities.
146147
@@ -314,6 +315,14 @@ def __init__(
314315
Defaults to a 5-poller maximum.
315316
nexus_task_poller_behavior: Specify the behavior of Nexus task polling.
316317
Defaults to a 5-poller maximum.
318+
disable_payload_error_limit: If true, payload and memo error limit checks
319+
are disabled in the worker, allowing payloads and memos that are above
320+
the server error limit to be submitted to the Temporal server. If false,
321+
the worker will validate the size before submitting to the Temporal server,
322+
and cause a task failure if the size limit is exceeded. The default is False.
323+
See https://docs.temporal.io/troubleshooting/blob-size-limit-error for more
324+
details.
325+
317326
"""
318327
config = WorkerConfig(
319328
client=client,
@@ -357,6 +366,7 @@ def __init__(
357366
workflow_task_poller_behavior=workflow_task_poller_behavior,
358367
activity_task_poller_behavior=activity_task_poller_behavior,
359368
nexus_task_poller_behavior=nexus_task_poller_behavior,
369+
disable_payload_error_limit=disable_payload_error_limit,
360370
)
361371

362372
plugins_from_client = cast(
@@ -719,10 +729,11 @@ async def _run(self):
719729
namespace_info = await self._bridge_worker.validate()
720730
payload_error_limits = (
721731
_PayloadErrorLimits(
722-
memo_upload_error_limit=namespace_info.Limits.memo_size_limit_error,
723-
payload_upload_error_limit=namespace_info.Limits.blob_size_limit_error,
732+
memo_size_error=namespace_info.Limits.memo_size_limit_error,
733+
payload_size_error=namespace_info.Limits.blob_size_limit_error,
724734
)
725735
if namespace_info.HasField("limits")
736+
and not self._config.get("disable_payload_error_limit", False)
726737
else None
727738
)
728739

@@ -930,6 +941,7 @@ class WorkerConfig(TypedDict, total=False):
930941
workflow_task_poller_behavior: PollerBehavior
931942
activity_task_poller_behavior: PollerBehavior
932943
nexus_task_poller_behavior: PollerBehavior
944+
disable_payload_error_limit: bool
933945

934946

935947
def _warn_if_activity_executor_max_workers_is_inconsistent(

tests/worker/test_workflow.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
PayloadCodec,
9191
PayloadConverter,
9292
PayloadLimitsConfig,
93+
PayloadSizeWarning,
9394
_PayloadErrorLimits,
9495
)
9596
from temporalio.exceptions import (
@@ -8552,7 +8553,7 @@ async def test_large_payload_workflow_input_warning(client: Client):
85528553
config["data_converter"] = dataclasses.replace(
85538554
temporalio.converter.default(),
85548555
payload_limits=PayloadLimitsConfig(
8555-
payload_upload_warning_limit=102,
8556+
payload_size_warning=102,
85568557
),
85578558
)
85588559
client = Client(**config)
@@ -8575,7 +8576,7 @@ async def test_large_payload_workflow_input_warning(client: Client):
85758576
)
85768577

85778578
assert len(w) == 1
8578-
assert issubclass(w[-1].category, UserWarning)
8579+
assert issubclass(w[-1].category, PayloadSizeWarning)
85798580
assert (
85808581
"[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit."
85818582
in str(w[-1].message)
@@ -8586,7 +8587,7 @@ async def test_large_payload_workflow_memo_warning(client: Client):
85868587
config = client.config()
85878588
config["data_converter"] = dataclasses.replace(
85888589
temporalio.converter.default(),
8589-
payload_limits=PayloadLimitsConfig(memo_upload_warning_limit=128),
8590+
payload_limits=PayloadLimitsConfig(memo_size_warning=128),
85908591
)
85918592
client = Client(**config)
85928593

@@ -8613,13 +8614,37 @@ async def test_large_payload_workflow_memo_warning(client: Client):
86138614
)
86148615

86158616
assert len(w) == 1
8616-
assert issubclass(w[-1].category, UserWarning)
8617+
assert issubclass(w[-1].category, PayloadSizeWarning)
86178618
assert (
86188619
"[TMPRL1103] Attempted to upload memo with size that exceeded the warning limit."
86198620
in str(w[-1].message)
86208621
)
86218622

86228623

8624+
async def test_large_payload_workflow_payload_error_disabled(client: Client):
8625+
async with new_worker(
8626+
client,
8627+
LargePayloadWorkflow,
8628+
activities=[large_payload_activity],
8629+
# Referenced server version doesn't report payload limits.
8630+
# Configure error limits in server when limit reporting supported.
8631+
disable_payload_error_limit=True,
8632+
) as worker:
8633+
await client.execute_workflow(
8634+
LargePayloadWorkflow.run,
8635+
LargePayloadWorkflowInput(
8636+
activity_input_data_size=0,
8637+
activity_output_data_size=0,
8638+
activity_exception_data_size=0,
8639+
workflow_output_data_size=6 * 1024,
8640+
data=[],
8641+
),
8642+
id=f"workflow-{uuid.uuid4()}",
8643+
task_queue=worker.task_queue,
8644+
execution_timeout=timedelta(seconds=3),
8645+
)
8646+
8647+
86238648
async def test_large_payload_workflow_result_error(client: Client):
86248649
# Create worker runtime with forwarded logger
86258650
worker_logger = logging.getLogger(f"log-{uuid.uuid4()}")
@@ -8642,8 +8667,8 @@ async def test_large_payload_workflow_result_error(client: Client):
86428667
# Remove and configure error limits in server when limit reporting supported.
86438668
data_converter=temporalio.converter.default()._with_payload_error_limits(
86448669
_PayloadErrorLimits(
8645-
memo_upload_error_limit=0,
8646-
payload_upload_error_limit=error_limit,
8670+
memo_size_error=0,
8671+
payload_size_error=error_limit,
86478672
)
86488673
),
86498674
)
@@ -8698,7 +8723,7 @@ async def test_large_payload_workflow_result_warning(client: Client):
86988723
config["data_converter"] = dataclasses.replace(
86998724
temporalio.converter.default(),
87008725
payload_limits=PayloadLimitsConfig(
8701-
payload_upload_warning_limit=1024,
8726+
payload_size_warning=1024,
87028727
),
87038728
)
87048729
worker_client = Client(**config)
@@ -8722,7 +8747,7 @@ async def test_large_payload_workflow_result_warning(client: Client):
87228747
)
87238748

87248749
assert len(w) == 1
8725-
assert issubclass(w[-1].category, UserWarning)
8750+
assert issubclass(w[-1].category, PayloadSizeWarning)
87268751
assert (
87278752
"[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit."
87288753
in str(w[-1].message)
@@ -8751,8 +8776,8 @@ async def test_large_payload_activity_input_error(client: Client):
87518776
# Remove and configure error limits in server when limit reporting supported.
87528777
data_converter=temporalio.converter.default()._with_payload_error_limits(
87538778
_PayloadErrorLimits(
8754-
memo_upload_error_limit=0,
8755-
payload_upload_error_limit=error_limit,
8779+
memo_size_error=0,
8780+
payload_size_error=error_limit,
87568781
)
87578782
),
87588783
)
@@ -8806,7 +8831,7 @@ async def test_large_payload_activity_input_warning(client: Client):
88068831
config["data_converter"] = dataclasses.replace(
88078832
temporalio.converter.default(),
88088833
payload_limits=PayloadLimitsConfig(
8809-
payload_upload_warning_limit=1024,
8834+
payload_size_warning=1024,
88108835
),
88118836
)
88128837
worker_client = Client(**config)
@@ -8829,7 +8854,7 @@ async def test_large_payload_activity_input_warning(client: Client):
88298854
)
88308855

88318856
assert len(w) == 1
8832-
assert issubclass(w[-1].category, UserWarning)
8857+
assert issubclass(w[-1].category, PayloadSizeWarning)
88338858
assert (
88348859
"[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit."
88358860
in str(w[-1].message)
@@ -8858,8 +8883,8 @@ async def test_large_payload_activity_exception_error(client: Client):
88588883
# Remove and configure error limits in server when limit reporting supported.
88598884
data_converter=temporalio.converter.default()._with_payload_error_limits(
88608885
_PayloadErrorLimits(
8861-
memo_upload_error_limit=0,
8862-
payload_upload_error_limit=error_limit,
8886+
memo_size_error=0,
8887+
payload_size_error=error_limit,
88638888
)
88648889
),
88658890
)
@@ -8921,8 +8946,8 @@ async def test_large_payload_activity_result_error(client: Client):
89218946
# Remove and configure error limits in server when limit reporting supported.
89228947
data_converter=temporalio.converter.default()._with_payload_error_limits(
89238948
_PayloadErrorLimits(
8924-
memo_upload_error_limit=0,
8925-
payload_upload_error_limit=error_limit,
8949+
memo_size_error=0,
8950+
payload_size_error=error_limit,
89268951
)
89278952
),
89288953
)
@@ -8970,7 +8995,7 @@ async def test_large_payload_activity_result_warning(client: Client):
89708995
config["data_converter"] = dataclasses.replace(
89718996
temporalio.converter.default(),
89728997
payload_limits=PayloadLimitsConfig(
8973-
payload_upload_warning_limit=1024,
8998+
payload_size_warning=1024,
89748999
),
89759000
)
89769001
worker_client = Client(**config)
@@ -8993,7 +9018,7 @@ async def test_large_payload_activity_result_warning(client: Client):
89939018
)
89949019

89959020
assert len(w) == 1
8996-
assert issubclass(w[-1].category, UserWarning)
9021+
assert issubclass(w[-1].category, PayloadSizeWarning)
89979022
assert (
89989023
"[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit."
89999024
in str(w[-1].message)

0 commit comments

Comments
 (0)