Skip to content

Commit b2f8b9f

Browse files
authored
Make SerializedParam.resolve not raise by default (apache#56923)
1 parent 44f8479 commit b2f8b9f

File tree

2 files changed

+27
-58
lines changed

2 files changed

+27
-58
lines changed

airflow-core/src/airflow/serialization/definitions/param.py

Lines changed: 24 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,78 +20,50 @@
2020

2121
import collections.abc
2222
import copy
23-
import json
2423
from typing import TYPE_CHECKING, Any
2524

26-
from airflow.exceptions import ParamValidationError
2725
from airflow.serialization.definitions.notset import NOTSET, ArgNotSet
2826

2927
if TYPE_CHECKING:
3028
from collections.abc import Iterator, Mapping
3129

3230

33-
def _check_json(value):
34-
try:
35-
json.dumps(value)
36-
except Exception:
37-
raise ParamValidationError(
38-
f"All provided parameters must be JSON-serializable. The value '{value}' is not."
39-
)
40-
41-
4231
class SerializedParam:
43-
"""Server-side Param class for deserialization."""
32+
"""Server-side param class for deserialization."""
4433

45-
def __init__(self, default: Any = None, description: str | None = None, **schema):
34+
def __init__(self, default: Any = NOTSET, description: str | None = None, **schema):
4635
# No validation needed - the SDK already validated the default.
4736
self.value = default
4837
self.description = description
4938
self.schema = schema
5039

51-
def resolve(self, value: Any = NOTSET, suppress_exception: bool = False) -> Any:
40+
def resolve(self, *, raises: bool = False) -> Any:
5241
"""
53-
Run the validations and returns the Param's final value.
42+
Run the validations and returns the param's final value.
5443
55-
May raise ValueError on failed validations, or TypeError
56-
if no value is passed and no value already exists.
57-
We first check that value is json-serializable; if not, warn.
58-
In future release we will require the value to be json-serializable.
44+
Different from SDK Param, this function never raises by default. *None*
45+
is returned if validation fails, no value is available, or the return
46+
value is not JSON-serializable.
5947
60-
:param value: The value to be updated for the Param
61-
:param suppress_exception: To raise an exception or not when validation
62-
fails. If true and validations fails, *None* is returned.
48+
:param raises: All exceptions during validation are suppressed by
49+
default. They are only raised if this is set to *True* instead.
6350
"""
6451
import jsonschema
65-
from jsonschema import FormatChecker
66-
from jsonschema.exceptions import ValidationError
6752

68-
if not isinstance(value, ArgNotSet):
69-
try:
70-
_check_json(value)
71-
except ParamValidationError:
72-
if suppress_exception:
73-
return None
74-
raise
75-
final_val = value
76-
elif isinstance(self.value, ArgNotSet):
77-
if suppress_exception:
78-
return None
79-
raise ParamValidationError("No value passed and Param has no default value")
80-
else:
81-
final_val = self.value
8253
try:
83-
jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
84-
except ValidationError as err:
85-
if suppress_exception:
54+
if isinstance(value := self.value, ArgNotSet):
55+
raise ValueError("No value passed")
56+
jsonschema.validate(value, self.schema, format_checker=jsonschema.FormatChecker())
57+
except Exception:
58+
if not raises:
8659
return None
87-
raise ParamValidationError(err) from None
88-
self.value = final_val
89-
return final_val
60+
raise
61+
return value
9062

9163
def dump(self) -> dict[str, Any]:
9264
"""Return the full param spec for API consumers."""
9365
return {
94-
"value": None if isinstance(self.value, ArgNotSet) else self.value,
66+
"value": self.resolve(),
9567
"schema": self.schema,
9668
"description": self.description,
9769
}
@@ -115,12 +87,11 @@ class SerializedParamsDict(collections.abc.Mapping[str, Any]):
11587

11688
__dict: dict[str, SerializedParam]
11789

118-
def __init__(self, d: Mapping[str, Any] | None = None, *, suppress_exception: bool = False) -> None:
90+
def __init__(self, d: Mapping[str, Any] | None = None) -> None:
11991
self.__dict = dict(_collect_params(d))
120-
self.suppress_exception = suppress_exception
12192

12293
def __eq__(self, other: Any) -> bool:
123-
"""Compare ParamsDict objects using their dumped content, matching SDK behavior."""
94+
"""Compare params dicts using their dumped content, matching SDK behavior."""
12495
if hasattr(other, "dump"): # ParamsDict or SerializedParamsDict
12596
return self.dump() == other.dump()
12697
if isinstance(other, collections.abc.Mapping):
@@ -155,19 +126,19 @@ def values(self):
155126
return collections.abc.ValuesView(self.__dict)
156127

157128
def validate(self) -> dict[str, Any]:
158-
"""Validate & returns all the Params object stored in the dictionary."""
129+
"""Validate & returns all the params stored in the dictionary."""
159130

160131
def _validate_one(k: str, v: SerializedParam):
161132
try:
162-
return v.resolve(suppress_exception=self.suppress_exception)
163-
except ParamValidationError as e:
164-
raise ParamValidationError(f"Invalid input for param {k}: {e}") from None
133+
return v.resolve(raises=True)
134+
except Exception as e:
135+
raise ValueError(f"Invalid input for param {k}: {e}") from None
165136

166137
return {k: _validate_one(k, v) for k, v in self.__dict.items()}
167138

168139
def dump(self) -> Mapping[str, Any]:
169140
"""Dump the resolved values as a mapping."""
170-
return {k: v.resolve(suppress_exception=True) for k, v in self.__dict.items()}
141+
return {k: v.resolve() for k, v in self.__dict.items()}
171142

172143
def deep_merge(self, data: Mapping[str, Any] | None) -> SerializedParamsDict:
173144
"""Create a new params dict by merging incoming data into this params dict."""

airflow-core/tests/unit/models/test_dag.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from airflow._shared.timezones.timezone import datetime as datetime_tz
3939
from airflow.configuration import conf
4040
from airflow.dag_processing.dagbag import DagBag
41-
from airflow.exceptions import AirflowException, ParamValidationError
41+
from airflow.exceptions import AirflowException
4242
from airflow.models.asset import (
4343
AssetAliasModel,
4444
AssetDagRunQueue,
@@ -1733,7 +1733,7 @@ def test_next_dagrun_info_on_29_feb(self):
17331733

17341734
def test_validate_params_on_trigger_dag(self, testing_dag_bundle):
17351735
dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")})
1736-
with pytest.raises(ParamValidationError, match="No value passed and Param has no default value"):
1736+
with pytest.raises(ValueError, match="No value passed"):
17371737
sync_dag_to_db(dag).create_dagrun(
17381738
run_id="test_dagrun_missing_param",
17391739
run_type=DagRunType.MANUAL,
@@ -1745,9 +1745,7 @@ def test_validate_params_on_trigger_dag(self, testing_dag_bundle):
17451745
)
17461746

17471747
dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")})
1748-
with pytest.raises(
1749-
ParamValidationError, match="Invalid input for param param1: None is not of type 'string'"
1750-
):
1748+
with pytest.raises(ValueError, match="None is not of type 'string'"):
17511749
sync_dag_to_db(dag).create_dagrun(
17521750
run_id="test_dagrun_missing_param",
17531751
run_type=DagRunType.MANUAL,

0 commit comments

Comments
 (0)