88import pathlib
99from collections .abc import Mapping , MutableMapping , Sequence , Set
1010from datetime import timedelta
11- from typing import Any , assert_never , overload
11+ from typing import Any , TypeGuard , assert_never , cast , overload
1212
1313from frequenz .channels import Broadcast , Receiver
1414from frequenz .channels .file_watcher import EventType
15+ from marshmallow import Schema , ValidationError
1516
1617from ._actor import ConfigManagingActor
18+ from ._util import DataclassT , load_config
1719
1820_logger = logging .getLogger (__name__ )
1921
@@ -128,6 +130,21 @@ async def new_receiver(
128130 skip_unchanged : bool = True ,
129131 ) -> Receiver [Mapping [str , Any ]]: ...
130132
133+ @overload
134+ async def new_receiver ( # pylint: disable=too-many-arguments
135+ self ,
136+ * ,
137+ wait_for_first : bool = True ,
138+ skip_unchanged : bool = True ,
139+ # We need to specify the key here because we have kwargs, so if it is not
140+ # present is not considered None as the only possible value, as any value can be
141+ # accepted as part of the kwargs.
142+ key : None = None ,
143+ schema : type [DataclassT ],
144+ base_schema : type [Schema ] | None = None ,
145+ ** marshmallow_load_kwargs : Any ,
146+ ) -> Receiver [DataclassT ]: ...
147+
131148 @overload
132149 async def new_receiver (
133150 self ,
@@ -137,14 +154,29 @@ async def new_receiver(
137154 key : str ,
138155 ) -> Receiver [Mapping [str , Any ] | None ]: ...
139156
157+ @overload
158+ async def new_receiver ( # pylint: disable=too-many-arguments
159+ self ,
160+ * ,
161+ wait_for_first : bool = True ,
162+ skip_unchanged : bool = True ,
163+ key : str ,
164+ schema : type [DataclassT ],
165+ base_schema : type [Schema ] | None ,
166+ ** marshmallow_load_kwargs : Any ,
167+ ) -> Receiver [DataclassT | None ]: ...
168+
140169 # The noqa DOC502 is needed because we raise TimeoutError indirectly.
141170 async def new_receiver ( # pylint: disable=too-many-arguments # noqa: DOC502
142171 self ,
143172 * ,
144173 wait_for_first : bool = False ,
145174 skip_unchanged : bool = True ,
146175 key : str | None = None ,
147- ) -> Receiver [Mapping [str , Any ] | None ]:
176+ schema : type [DataclassT ] | None = None ,
177+ base_schema : type [Schema ] | None = None ,
178+ ** marshmallow_load_kwargs : Any ,
179+ ) -> Receiver [Mapping [str , Any ] | DataclassT | None ]:
148180 """Create a new receiver for the configuration.
149181
150182 This method has a lot of features and functionalities to make it easier to
@@ -168,6 +200,26 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
168200 configuration under the specified key is received, or `None` if the key is not
169201 found.
170202
203+ ### Schema validation
204+
205+ The configuration is received as a dictionary unless a `schema` is provided. In
206+ this case, the configuration will be validated against the schema and received
207+ as an instance of the configuration class.
208+
209+ The configuration `schema` class is expected to be
210+ a [`dataclasses.dataclass`][], which is used to create
211+ a [`marshmallow.Schema`][] schema to validate the configuration dictionary.
212+
213+ To customize the schema derived from the configuration dataclass, you can
214+ use [`marshmallow_dataclass.dataclass`][] to specify extra metadata.
215+
216+ Configurations that don't pass the validation will be ignored and not sent to
217+ the receiver, but an error will be logged. Errors other than `ValidationError`
218+ will not be handled and will be raised.
219+
220+ Additional arguments can be passed to [`marshmallow.Schema.load`][] using keyword
221+ arguments.
222+
171223 ### Waiting for the first configuration
172224
173225 If `wait_for_first` is `True`, the receiver will wait for the first
@@ -196,6 +248,13 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
196248 key: The key to filter the configuration. A nested key can be specified by
197249 using a dot (`.`) as separator. For example "key.subkey" will get only
198250 `config[key][subkey]` If `None`, all configurations will be received.
251+ schema: The type of the configuration. If provided, the configuration
252+ will be validated against this type.
253+ base_schema: An optional class to be used as a base schema for the
254+ configuration class. This allow using custom fields for example. Will be
255+ passed to [`marshmallow_dataclass.class_schema`][].
256+ **marshmallow_load_kwargs: Additional arguments to be passed to
257+ [`marshmallow.Schema.load`][].
199258
200259 Returns:
201260 The receiver for the configuration.
@@ -214,8 +273,36 @@ async def new_receiver( # pylint: disable=too-many-arguments # noqa: DOC502
214273 lambda config : self ._config_has_changed (config , key )
215274 )
216275
217- if key is not None :
218- receiver = receiver .map (lambda config : config .get (key ))
276+ match (key , schema ):
277+ case (None , None ):
278+ pass
279+ case (None , schema ):
280+ # This cast can be removed after frequenz-channels is upgraded to v1.3.0
281+ receiver = cast (
282+ Receiver [DataclassT ],
283+ receiver .map (
284+ lambda config : _load_config_with_logging (
285+ config ,
286+ schema ,
287+ base_schema = base_schema ,
288+ ** marshmallow_load_kwargs ,
289+ )
290+ ).filter (_is_valid_and_not_none ),
291+ )
292+ case (key , None ):
293+ receiver = receiver .map (lambda config : config .get (key ))
294+ case (key , schema ):
295+ # This cast can be removed after frequenz-channels is upgraded to v1.3.0
296+ receiver = cast (
297+ Receiver [DataclassT | None ],
298+ receiver .map (
299+ lambda config : load_config (
300+ schema , config .get (key , {}), ** marshmallow_load_kwargs
301+ )
302+ ).filter (_is_valid_or_none ),
303+ )
304+ case unexpected :
305+ assert_never (unexpected )
219306
220307 if wait_for_first :
221308 async with asyncio .timeout (self .wait_for_first_timeout .total_seconds ()):
@@ -254,3 +341,78 @@ def _config_has_changed(
254341 _logger .debug ("Old configuration%s being kept: %r" , key_str , old_config )
255342
256343 return has_changed
344+
345+
346+ class _InvalidConfig :
347+ """A sentinel to represent an invalid configuration."""
348+
349+
350+ _INVALID_CONFIG = _InvalidConfig ()
351+ """A sentinel singleton instance to represent an invalid configuration."""
352+
353+
354+ @overload
355+ def _load_config_with_logging (
356+ config : Mapping [str , Any ],
357+ schema : type [DataclassT ],
358+ key : str ,
359+ base_schema : type [Schema ] | None = None ,
360+ ** marshmallow_load_kwargs : Any ,
361+ ) -> DataclassT | None | _InvalidConfig : ...
362+
363+
364+ @overload
365+ def _load_config_with_logging (
366+ config : Mapping [str , Any ],
367+ schema : type [DataclassT ],
368+ key : None = None ,
369+ base_schema : type [Schema ] | None = None ,
370+ ** marshmallow_load_kwargs : Any ,
371+ ) -> DataclassT | _InvalidConfig : ...
372+
373+
374+ def _load_config_with_logging (
375+ config : Mapping [str , Any ],
376+ schema : type [DataclassT ],
377+ key : str | None = None ,
378+ base_schema : type [Schema ] | None = None ,
379+ ** marshmallow_load_kwargs : Any ,
380+ ) -> DataclassT | None | _InvalidConfig :
381+ """Try to load a configuration and log any validation errors."""
382+ if key is not None :
383+ maybe_config = config .get (key , None )
384+ if maybe_config is None :
385+ _logger .debug (
386+ "Configuration key %s not found, sending None: config=%r" , key , config
387+ )
388+ return None
389+ config = maybe_config
390+
391+ try :
392+ return load_config (
393+ schema , config , base_schema = base_schema , ** marshmallow_load_kwargs
394+ )
395+ except ValidationError as err :
396+ key_str = ""
397+ if key :
398+ key_str = f" for key '{ key } '"
399+ _logger .error (
400+ "The configuration%s is invalid, the configuration update will be skipped: %s" ,
401+ key_str ,
402+ err ,
403+ )
404+ return _INVALID_CONFIG
405+
406+
407+ def _is_valid_or_none (
408+ config : DataclassT | _InvalidConfig | None ,
409+ ) -> TypeGuard [DataclassT | None ]:
410+ """Return whether the configuration is valid or `None`."""
411+ return config is not _INVALID_CONFIG
412+
413+
414+ def _is_valid_and_not_none (
415+ config : DataclassT | _InvalidConfig | None ,
416+ ) -> TypeGuard [DataclassT ]:
417+ """Return whether the configuration is valid and not `None`."""
418+ return config is not None and config is not _INVALID_CONFIG
0 commit comments