1111
1212from frequenz .channels import Broadcast , Receiver
1313from frequenz .channels .experimental import WithPrevious
14+ from marshmallow import Schema , ValidationError
1415from typing_extensions import override
1516
1617from ..actor ._background_service import BackgroundService
1718from ._managing_actor import ConfigManagingActor
19+ from ._util import DataclassT , load_config
1820
1921_logger = logging .getLogger (__name__ )
2022
@@ -124,18 +126,21 @@ def __repr__(self) -> str:
124126 """Return a string representation of this config manager."""
125127 return f"config_channel={ self .config_channel !r} , " f"actor={ self .actor !r} >"
126128
127- def new_receiver (
129+ def new_receiver ( # pylint: disable=too-many-arguments
128130 self ,
129131 # This is tricky, because a str is also a Sequence[str], if we would use only
130132 # Sequence[str], then a regular string would also be accepted and taken as
131133 # a sequence, like "key" -> ["k", "e", "y"]. We should never remove the str from
132134 # the allowed types without changing Sequence[str] to something more specific,
133135 # like list[str] or tuple[str] (but both have their own problems).
134136 key : str | Sequence [str ],
137+ config_class : type [DataclassT ],
135138 / ,
136139 * ,
137140 skip_unchanged : bool = True ,
138- ) -> Receiver [Mapping [str , Any ] | InvalidValueForKeyError | None ]:
141+ base_schema : type [Schema ] | None = None ,
142+ marshmallow_load_kwargs : dict [str , Any ] | None = None ,
143+ ) -> Receiver [DataclassT | Exception | None ]:
139144 """Create a new receiver for receiving the configuration for a particular key.
140145
141146 This method has a lot of features and functionalities to make it easier to
@@ -157,6 +162,21 @@ def new_receiver(
157162 will be logged and a [`frequenz.sdk.config.InvalidValueForKeyError`][] instance
158163 will be sent to the receiver.
159164
165+ ### Schema validation
166+
167+ The raw configuration received as a `Mapping` will be validated and loaded to
168+ as a `config_class`. The `config_class` class is expected to be
169+ a [`dataclasses.dataclass`][], which is used to create
170+ a [`marshmallow.Schema`][] via the [`marshmallow_dataclass.class_schema`][]
171+ function.
172+
173+ This means you can customize the schema derived from the configuration
174+ dataclass using [`marshmallow_dataclass`][] to specify extra validation and
175+ options via field metadata.
176+
177+ Additional arguments can be passed to [`marshmallow.Schema.load`][] using
178+ the `marshmallow_load_kwargs` keyword arguments.
179+
160180 ### Skipping superfluous updates
161181
162182 If there is a burst of configuration updates, the receiver will only receive the
@@ -167,6 +187,18 @@ def new_receiver(
167187 The comparison is done using the *raw* `dict` to determine if the configuration
168188 has changed.
169189
190+ ### Error handling
191+
192+ The value under `key` must be another mapping, otherwise an error
193+ will be logged and a [`frequenz.sdk.config.InvalidValueForKeyError`][] instance
194+ will be sent to the receiver.
195+
196+ Configurations that don't pass the validation will be logged as an error and
197+ the [`ValidationError`][marshmallow.ValidationError] sent to the receiver.
198+
199+ Any other unexpected error raised during the configuration loading will be
200+ logged as an error and the error instance sent to the receiver.
201+
170202 Example:
171203 ```python
172204 # TODO: Add Example
@@ -175,44 +207,49 @@ def new_receiver(
175207 Args:
176208 key: The configuration key to be read by the receiver. If a sequence of
177209 strings is used, it is used as a sub-key.
210+ config_class: The class object to use to instantiate a configuration. The
211+ configuration will be validated against this type too using
212+ [`marshmallow_dataclass`][].
178213 skip_unchanged: Whether to skip sending the configuration if it hasn't
179214 changed compared to the last one received.
215+ base_schema: An optional class to be used as a base schema for the
216+ configuration class. This allow using custom fields for example. Will be
217+ passed to [`marshmallow_dataclass.class_schema`][].
218+ marshmallow_load_kwargs: Additional arguments to be passed to
219+ [`marshmallow.Schema.load`][].
180220
181221 Returns:
182222 The receiver for the configuration.
183223 """
184- recv_name = key if isinstance (key , str ) else ":" .join (key )
185- receiver = self .config_channel .new_receiver (name = recv_name , limit = 1 )
186-
187- def _get_key_or_error (
188- config : Mapping [str , Any ]
189- ) -> Mapping [str , Any ] | InvalidValueForKeyError | None :
190- try :
191- return _get_key (config , key )
192- except InvalidValueForKeyError as error :
193- return error
194-
195- key_receiver = receiver .map (_get_key_or_error )
224+ receiver = self .config_channel .new_receiver (name = f"{ self } :{ key } " , limit = 1 ).map (
225+ lambda config : _load_config_with_logging_and_errors (
226+ config ,
227+ config_class ,
228+ key = key ,
229+ base_schema = base_schema ,
230+ marshmallow_load_kwargs = marshmallow_load_kwargs ,
231+ )
232+ )
196233
197234 if skip_unchanged :
198235 # For some reason the type argument for WithPrevious is not inferred
199236 # correctly, so we need to specify it explicitly.
200- return key_receiver .filter (
201- WithPrevious [Mapping [ str , Any ] | InvalidValueForKeyError | None ](
237+ return receiver .filter (
238+ WithPrevious [DataclassT | Exception | None ](
202239 lambda old , new : _not_equal_with_logging (
203240 key = key , old_value = old , new_value = new
204241 )
205242 )
206243 )
207244
208- return key_receiver
245+ return receiver
209246
210247
211248def _not_equal_with_logging (
212249 * ,
213250 key : str | Sequence [str ],
214- old_value : Mapping [ str , Any ] | InvalidValueForKeyError | None ,
215- new_value : Mapping [ str , Any ] | InvalidValueForKeyError | None ,
251+ old_value : DataclassT | Exception | None ,
252+ new_value : DataclassT | Exception | None ,
216253) -> bool :
217254 """Return whether the two mappings are not equal, logging if they are the same."""
218255 if old_value == new_value :
@@ -234,6 +271,47 @@ def _not_equal_with_logging(
234271 return True
235272
236273
274+ def _load_config_with_logging_and_errors (
275+ config : Mapping [str , Any ],
276+ config_class : type [DataclassT ],
277+ * ,
278+ key : str | Sequence [str ],
279+ base_schema : type [Schema ] | None = None ,
280+ marshmallow_load_kwargs : dict [str , Any ] | None = None ,
281+ ) -> DataclassT | Exception | None :
282+ """Load the configuration for the specified key, logging errors and returning them."""
283+ try :
284+ sub_config = _get_key (config , key )
285+ if sub_config is None :
286+ _logger .debug ("Configuration key %r not found, sending None" , key )
287+ return None
288+
289+ loaded_config = load_config (
290+ config_class ,
291+ sub_config ,
292+ base_schema = base_schema ,
293+ marshmallow_load_kwargs = marshmallow_load_kwargs ,
294+ )
295+ _logger .debug ("Received new configuration: %s" , loaded_config )
296+ return loaded_config
297+ except InvalidValueForKeyError as error :
298+ if len (key ) > 1 and key != error .key :
299+ _logger .error ("Error when looking for sub-key %r: %s" , key , error )
300+ else :
301+ _logger .error (str (error ))
302+ return error
303+ except ValidationError as error :
304+ _logger .error ("The configuration for key %r is invalid: %s" , key , error )
305+ return error
306+ except Exception as error : # pylint: disable=broad-except
307+ _logger .exception (
308+ "An unexpected error occurred while loading the configuration for key %r: %s" ,
309+ key ,
310+ error ,
311+ )
312+ return error
313+
314+
237315def _get_key (
238316 config : Mapping [str , Any ],
239317 # This is tricky, because a str is also a Sequence[str], if we would use only
0 commit comments