|
| 1 | +# License: MIT |
| 2 | +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH |
| 3 | + |
| 4 | +"""Read and update logging severity from config.""" |
| 5 | + |
| 6 | +import logging |
| 7 | +from collections.abc import Mapping |
| 8 | +from dataclasses import field |
| 9 | +from typing import Annotated, Any, Self, cast |
| 10 | + |
| 11 | +import marshmallow |
| 12 | +import marshmallow.validate |
| 13 | +from frequenz.channels import Receiver |
| 14 | +from marshmallow import RAISE |
| 15 | +from marshmallow_dataclass import class_schema, dataclass |
| 16 | + |
| 17 | +from frequenz.sdk.actor import Actor |
| 18 | + |
| 19 | +_logger = logging.getLogger(__name__) |
| 20 | + |
| 21 | +LogLevel = Annotated[ |
| 22 | + str, |
| 23 | + marshmallow.fields.String( |
| 24 | + validate=marshmallow.validate.OneOf(choices=logging.getLevelNamesMapping()) |
| 25 | + ), |
| 26 | +] |
| 27 | + |
| 28 | + |
| 29 | +@dataclass |
| 30 | +class LoggerConfig: |
| 31 | + """A configuration for a logger.""" |
| 32 | + |
| 33 | + level: LogLevel = field( |
| 34 | + default="NOTSET", |
| 35 | + metadata={ |
| 36 | + "metadata": { |
| 37 | + "description": "Log level for the logger. Uses standard logging levels." |
| 38 | + }, |
| 39 | + "required": False, |
| 40 | + }, |
| 41 | + ) |
| 42 | + """The log level for the logger.""" |
| 43 | + |
| 44 | + |
| 45 | +@dataclass |
| 46 | +class LoggingConfig: |
| 47 | + """A configuration for the logging system.""" |
| 48 | + |
| 49 | + root_logger: LoggerConfig = field( |
| 50 | + default_factory=LoggerConfig, |
| 51 | + metadata={ |
| 52 | + "metadata": { |
| 53 | + "description": "Default default configuration for all loggers.", |
| 54 | + }, |
| 55 | + "required": False, |
| 56 | + }, |
| 57 | + ) |
| 58 | + """The default log level.""" |
| 59 | + |
| 60 | + loggers: dict[str, LoggerConfig] = field( |
| 61 | + default_factory=dict, |
| 62 | + metadata={ |
| 63 | + "metadata": { |
| 64 | + "description": "Configuration for a logger (the key is the logger name)." |
| 65 | + }, |
| 66 | + "required": False, |
| 67 | + }, |
| 68 | + ) |
| 69 | + """The list of loggers configurations.""" |
| 70 | + |
| 71 | + @classmethod |
| 72 | + def load(cls, configs: Mapping[str, Any]) -> Self: # noqa: DOC502 |
| 73 | + """Load and validate configs from a dictionary. |
| 74 | +
|
| 75 | + Args: |
| 76 | + configs: The configuration to validate. |
| 77 | +
|
| 78 | + Returns: |
| 79 | + The configuration if they are valid. |
| 80 | +
|
| 81 | + Raises: |
| 82 | + ValidationError: if the configuration are invalid. |
| 83 | + """ |
| 84 | + schema = class_schema(cls)() |
| 85 | + return cast(Self, schema.load(configs, unknown=RAISE)) |
| 86 | + |
| 87 | + |
| 88 | +class LoggingConfigUpdater(Actor): |
| 89 | + """Actor that listens for logging configuration changes and sets them. |
| 90 | +
|
| 91 | + Example: |
| 92 | + `config.toml` file: |
| 93 | + ```toml |
| 94 | + [logging.root_logger] |
| 95 | + level = "INFO" |
| 96 | +
|
| 97 | + [logging.loggers."frequenz.sdk.actor.power_distributing"] |
| 98 | + level = "DEBUG" |
| 99 | +
|
| 100 | + [logging.loggers."frequenz.channels"] |
| 101 | + level = "DEBUG" |
| 102 | + ``` |
| 103 | +
|
| 104 | + ```python |
| 105 | + import asyncio |
| 106 | + from collections.abc import Mapping |
| 107 | + from typing import Any |
| 108 | +
|
| 109 | + from frequenz.channels import Broadcast |
| 110 | + from frequenz.sdk.config import LoggingConfigUpdater, ConfigManager |
| 111 | + from frequenz.sdk.actor import run as run_actors |
| 112 | +
|
| 113 | + async def run() -> None: |
| 114 | + config_channel = Broadcast[Mapping[str, Any]](name="config", resend_latest=True) |
| 115 | + actors = [ |
| 116 | + ConfigManager(config_paths=["config.toml"], output=config_channel.new_sender()), |
| 117 | + LoggingConfigUpdater( |
| 118 | + config_recv=config_channel.new_receiver(limit=1)).map( |
| 119 | + lambda app_config: app_config.get("logging", {} |
| 120 | + ) |
| 121 | + ), |
| 122 | + ] |
| 123 | + await run_actors(*actors) |
| 124 | +
|
| 125 | + asyncio.run(run()) |
| 126 | + ``` |
| 127 | +
|
| 128 | + Now whenever the `config.toml` file is updated, the logging configuration |
| 129 | + will be updated as well. |
| 130 | + """ |
| 131 | + |
| 132 | + def __init__( |
| 133 | + self, |
| 134 | + config_recv: Receiver[Mapping[str, Any]], |
| 135 | + log_format: str = "%(asctime)s %(levelname)-8s %(name)s:%(lineno)s: %(message)s", |
| 136 | + log_datefmt: str = "%Y-%m-%dT%H:%M:%S%z", |
| 137 | + ): |
| 138 | + """Initialize this instance. |
| 139 | +
|
| 140 | + Args: |
| 141 | + config_recv: The receiver to listen for configuration changes. |
| 142 | + log_format: Use the specified format string in logs. |
| 143 | + log_datefmt: Use the specified date/time format in logs. |
| 144 | + """ |
| 145 | + super().__init__() |
| 146 | + self._config_recv = config_recv |
| 147 | + self._format = log_format |
| 148 | + self._datefmt = log_datefmt |
| 149 | + |
| 150 | + # Setup default configuration. |
| 151 | + # This ensures logging is configured even if actor fails to start or |
| 152 | + # if the configuration cannot be loaded. |
| 153 | + self._current_config: LoggingConfig = LoggingConfig() |
| 154 | + self._update_logging(self._current_config) |
| 155 | + |
| 156 | + async def _run(self) -> None: |
| 157 | + """Listen for configuration changes and update logging.""" |
| 158 | + async for message in self._config_recv: |
| 159 | + try: |
| 160 | + new_config = LoggingConfig.load(message) |
| 161 | + except marshmallow.ValidationError: |
| 162 | + _logger.exception( |
| 163 | + "Invalid logging configuration received. Skipping config update" |
| 164 | + ) |
| 165 | + continue |
| 166 | + |
| 167 | + if new_config != self._current_config: |
| 168 | + self._update_logging(new_config) |
| 169 | + |
| 170 | + def _update_logging(self, config: LoggingConfig) -> None: |
| 171 | + """Configure the logging level.""" |
| 172 | + # If the logger is not in the new config, set it to NOTSET |
| 173 | + loggers_to_unset = self._current_config.loggers.keys() - config.loggers.keys() |
| 174 | + for logger_id in loggers_to_unset: |
| 175 | + _logger.debug("Unsetting log level for logger '%s'", logger_id) |
| 176 | + logging.getLogger(logger_id).setLevel(logging.NOTSET) |
| 177 | + |
| 178 | + self._current_config = config |
| 179 | + logging.basicConfig( |
| 180 | + format=self._format, |
| 181 | + level=self._current_config.root_logger.level, |
| 182 | + datefmt=self._datefmt, |
| 183 | + ) |
| 184 | + |
| 185 | + # For each logger in the new config, set the log level |
| 186 | + for logger_id, logger_config in self._current_config.loggers.items(): |
| 187 | + _logger.debug( |
| 188 | + "Setting log level for logger '%s' to '%s'", |
| 189 | + logger_id, |
| 190 | + logger_config.level, |
| 191 | + ) |
| 192 | + logging.getLogger(logger_id).setLevel(logger_config.level) |
| 193 | + |
| 194 | + _logger.info("Logging config changed to: %s", self._current_config) |
0 commit comments