88import pathlib
99from collections .abc import Mapping , Sequence
1010from datetime import timedelta
11- from typing import Any , Final , overload
11+ from typing import Any , Final , TypeGuard , overload
1212
1313from frequenz .channels import Broadcast , Receiver
1414from frequenz .channels .experimental import WithPrevious
15+ from marshmallow import Schema , ValidationError
1516
1617from ._managing_actor import ConfigManagingActor
18+ from ._util import DataclassT , load_config
1719
1820_logger = logging .getLogger (__name__ )
1921
@@ -106,6 +108,21 @@ async def new_receiver(
106108 skip_unchanged : bool = True ,
107109 ) -> Receiver [Mapping [str , Any ]]: ...
108110
111+ @overload
112+ async def new_receiver ( # pylint: disable=too-many-arguments
113+ self ,
114+ * ,
115+ wait_for_first : bool = True ,
116+ skip_unchanged : bool = True ,
117+ # We need to specify the key here because we have kwargs, so if it is not
118+ # present is not considered None as the only possible value, as any value can be
119+ # accepted as part of the kwargs.
120+ key : None = None ,
121+ schema : type [DataclassT ],
122+ base_schema : type [Schema ] | None = None ,
123+ ** marshmallow_load_kwargs : Any ,
124+ ) -> Receiver [DataclassT ]: ...
125+
109126 @overload
110127 async def new_receiver (
111128 self ,
@@ -115,14 +132,34 @@ async def new_receiver(
115132 key : str ,
116133 ) -> Receiver [Mapping [str , Any ] | None ]: ...
117134
135+ @overload
136+ async def new_receiver ( # pylint: disable=too-many-arguments
137+ self ,
138+ * ,
139+ wait_for_first : bool = True ,
140+ skip_unchanged : bool = True ,
141+ key : str ,
142+ schema : type [DataclassT ],
143+ base_schema : type [Schema ] | None ,
144+ ** marshmallow_load_kwargs : Any ,
145+ ) -> Receiver [DataclassT | None ]: ...
146+
118147 # The noqa DOC502 is needed because we raise TimeoutError indirectly.
119148 async def new_receiver ( # pylint: disable=too-many-arguments # noqa: DOC502
120149 self ,
121150 * ,
122151 wait_for_first : bool = False ,
123152 skip_unchanged : bool = True ,
124153 key : str | None = None ,
125- ) -> Receiver [Mapping [str , Any ]] | Receiver [Mapping [str , Any ] | None ]:
154+ schema : type [DataclassT ] | None = None ,
155+ base_schema : type [Schema ] | None = None ,
156+ ** marshmallow_load_kwargs : Any ,
157+ ) -> (
158+ Receiver [Mapping [str , Any ]]
159+ | Receiver [Mapping [str , Any ] | None ]
160+ | Receiver [DataclassT ]
161+ | Receiver [DataclassT | None ]
162+ ):
126163 """Create a new receiver for the configuration.
127164
128165 This method has a lot of features and functionalities to make it easier to
@@ -147,6 +184,26 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
147184 otherwise only the part of the configuration under the specified key is
148185 received, or `None` if the key is not found.
149186
187+ ### Schema validation
188+
189+ The configuration is received as a dictionary unless a `schema` is provided. In
190+ this case, the configuration will be validated against the schema and received
191+ as an instance of the configuration class.
192+
193+ The configuration `schema` class is expected to be
194+ a [`dataclasses.dataclass`][], which is used to create
195+ a [`marshmallow.Schema`][] schema to validate the configuration dictionary.
196+
197+ To customize the schema derived from the configuration dataclass, you can
198+ use [`marshmallow_dataclass.dataclass`][] to specify extra metadata.
199+
200+ Configurations that don't pass the validation will be ignored and not sent to
201+ the receiver, but an error will be logged. Errors other than `ValidationError`
202+ will not be handled and will be raised.
203+
204+ Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword
205+ arguments.
206+
150207 ### Waiting for the first configuration
151208
152209 If `wait_for_first` is `True`, the receiver will wait for the first
@@ -174,6 +231,13 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
174231 changed compared to the last one received.
175232 key: The key to filter the configuration. If `None`, the full configuration
176233 will be received.
234+ schema: The type of the configuration. If provided, the configuration
235+ will be validated against this type.
236+ base_schema: An optional class to be used as a base schema for the
237+ configuration class. This allow using custom fields for example. Will be
238+ passed to [`marshmallow_dataclass.class_schema`][].
239+ **marshmallow_load_kwargs: Additional arguments to be passed to
240+ [`marshmallow.Schema.load`][].
177241
178242 Returns:
179243 The receiver for the configuration.
@@ -182,6 +246,79 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
182246 asyncio.TimeoutError: If `wait_for_first` is `True` and the first
183247 configuration can't be received in time.
184248 """
249+ # All supporting generic function (using DataclassT) need to be nested
250+ # here. For some reasons mypy has trouble if these functions are
251+ # global, it consider the DataclassT used by this method and the global
252+ # functions to be different, leading to very hard to find typing
253+ # errors.
254+
255+ @overload
256+ def _load_config_with_logging (
257+ config : Mapping [str , Any ],
258+ schema : type [DataclassT ],
259+ * ,
260+ key : None = None ,
261+ base_schema : type [Schema ] | None = None ,
262+ ** marshmallow_load_kwargs : Any ,
263+ ) -> DataclassT | _InvalidConfig : ...
264+
265+ @overload
266+ def _load_config_with_logging (
267+ config : Mapping [str , Any ],
268+ schema : type [DataclassT ],
269+ * ,
270+ key : str ,
271+ base_schema : type [Schema ] | None = None ,
272+ ** marshmallow_load_kwargs : Any ,
273+ ) -> DataclassT | None | _InvalidConfig : ...
274+
275+ def _load_config_with_logging (
276+ config : Mapping [str , Any ],
277+ schema : type [DataclassT ],
278+ * ,
279+ key : str | None = None ,
280+ base_schema : type [Schema ] | None = None ,
281+ ** marshmallow_load_kwargs : Any ,
282+ ) -> DataclassT | None | _InvalidConfig :
283+ """Try to load a configuration and log any validation errors."""
284+ if key is not None :
285+ maybe_config = config .get (key , None )
286+ if maybe_config is None :
287+ _logger .debug (
288+ "Configuration key %s not found, sending None: config=%r" ,
289+ key ,
290+ config ,
291+ )
292+ return None
293+ config = maybe_config
294+
295+ try :
296+ return load_config (
297+ schema , config , base_schema = base_schema , ** marshmallow_load_kwargs
298+ )
299+ except ValidationError as err :
300+ key_str = ""
301+ if key :
302+ key_str = f" for key '{ key } '"
303+ _logger .error (
304+ "The configuration%s is invalid, the configuration update will be skipped: %s" ,
305+ key_str ,
306+ err ,
307+ )
308+ return _INVALID_CONFIG
309+
310+ def _is_valid_or_none (
311+ config : DataclassT | _InvalidConfig | None ,
312+ ) -> TypeGuard [DataclassT | None ]:
313+ """Return whether the configuration is valid or `None`."""
314+ return config is not _INVALID_CONFIG
315+
316+ def _is_valid (
317+ config : DataclassT | _InvalidConfig ,
318+ ) -> TypeGuard [DataclassT ]:
319+ """Return whether the configuration is valid and not `None`."""
320+ return config is not _INVALID_CONFIG
321+
185322 recv_name = f"{ self } _receiver" if key is None else f"{ self } _receiver_{ key } "
186323 receiver = self .config_channel .new_receiver (name = recv_name , limit = 1 )
187324
@@ -192,10 +329,40 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
192329 async with asyncio .timeout (self .wait_for_first_timeout .total_seconds ()):
193330 await receiver .ready ()
194331
195- if key is None :
196- return receiver
197-
198- return receiver .map (lambda config : config .get (key ))
332+ match (key , schema ):
333+ case (None , None ):
334+ return receiver
335+ case (None , type ()):
336+ return receiver .map (
337+ lambda config : _load_config_with_logging (
338+ config ,
339+ schema ,
340+ # we need to pass it explicitly because of the
341+ # variadic keyword arguments, otherwise key
342+ # could be included in marshmallow_load_kwargs
343+ # with a value different than None.
344+ key = None ,
345+ base_schema = base_schema ,
346+ ** marshmallow_load_kwargs ,
347+ )
348+ ).filter (_is_valid )
349+ case (str (), None ):
350+ return receiver .map (lambda config : config .get (key ))
351+ case (str (), type ()):
352+ return receiver .map (
353+ lambda config : _load_config_with_logging (
354+ config ,
355+ schema ,
356+ key = key ,
357+ base_schema = base_schema ,
358+ ** marshmallow_load_kwargs ,
359+ )
360+ ).filter (_is_valid_or_none )
361+ case unexpected :
362+ # We can't use `assert_never` here because `mypy` is
363+ # having trouble
364+ # narrowing the types of a tuple.
365+ assert False , f"Unexpected match: { unexpected } "
199366
200367
201368class _NotEqualWithLogging :
@@ -244,3 +411,11 @@ def __call__(
244411 _logger .debug ("Old configuration%s being kept: %r" , key_str , old_config )
245412
246413 return has_changed
414+
415+
416+ class _InvalidConfig :
417+ """A sentinel to represent an invalid configuration."""
418+
419+
420+ _INVALID_CONFIG = _InvalidConfig ()
421+ """A sentinel singleton instance to represent an invalid configuration."""
0 commit comments