Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions src/zenml/config/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from zenml.models import PipelineSnapshotBase
from zenml.pipelines.run_utils import get_default_run_name
from zenml.utils import pydantic_utils, secret_utils, settings_utils
from zenml.utils.warnings import WARNING_CONTROLLER, WarningCodes

if TYPE_CHECKING:
from zenml.pipelines.pipeline_definition import Pipeline
Expand Down Expand Up @@ -369,6 +370,37 @@ def _verify_upstream_steps(
f"steps in this pipeline: {available_steps}."
)

@staticmethod
def _validate_docker_settings_usage(
docker_settings: "BaseSettings | None",
stack: "Stack",
) -> None:
"""Validates that docker settings are used with a proper stack.

Generates warning for improper docker settings usage or returns.

Args:
docker_settings: The docker settings specified for the step/pipeline.
stack: The stack the settings are validated against.

"""
from zenml.orchestrators import LocalOrchestrator

if not docker_settings:
return

warning_message = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do actually have a few orchestrators that are not containerized but still use some DockerSettings attributes to configure their behaviour (WheeledOrchestrator, LightningOrchestrator). So we either need to get rid of this warning here or also include those two

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Do we have any production orchestrators that don't make use of the docker settings? Seems like we just need to catch the LocalOrchestrator here and info the message 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think just the local one ignores them entirely

"You are specifying docker settings but the orchestrator"
" you are using (LocalOrchestrator) will not make use of them. "
"Consider switching stacks, removing the settings, or using a "
"different orchestrator."
)

if isinstance(stack.orchestrator, LocalOrchestrator):
WARNING_CONTROLLER.info(
warning_code=WarningCodes.ZML002, message=warning_message
)

def _filter_and_validate_settings(
self,
settings: Dict[str, "BaseSettings"],
Expand All @@ -390,16 +422,19 @@ def _filter_and_validate_settings(
Returns:
The filtered settings.
"""
from zenml.config.constants import DOCKER_SETTINGS_KEY

validated_settings = {}

for key, settings_instance in settings.items():
resolver = SettingsResolver(key=key, settings=settings_instance)
try:
settings_instance = resolver.resolve(stack=stack)
except KeyError:
logger.info(
"Not including stack component settings with key `%s`.",
key,
WARNING_CONTROLLER.info(
warning_code=WarningCodes.ZML001,
message="Not including stack component settings with key {key}",
key=key,
)
continue

Expand Down Expand Up @@ -429,6 +464,11 @@ def _filter_and_validate_settings(

validated_settings[key] = settings_instance

self._validate_docker_settings_usage(
stack=stack,
docker_settings=settings.get(DOCKER_SETTINGS_KEY),
)

return validated_settings

def _get_step_spec(
Expand Down
3 changes: 3 additions & 0 deletions src/zenml/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ def init_logging() -> None:
for handler in root_logger.handlers
)

# logging capture warnings
logging.captureWarnings(True)

if not has_console_handler:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(get_formatter())
Expand Down
28 changes: 28 additions & 0 deletions src/zenml/utils/warnings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Initialization of the warning utils module.

The `warnings` module contains utilities that help centralize
and improve warning management.
"""

from zenml.utils.warnings.controller import WarningController
from zenml.utils.warnings.registry import WarningCodes

WARNING_CONTROLLER = WarningController.create()

__all__ = [
"WARNING_CONTROLLER",
"WarningCodes"
]
57 changes: 57 additions & 0 deletions src/zenml/utils/warnings/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Warning configuration class and helper enums."""

from pydantic import BaseModel, Field

from zenml.utils.enum_utils import StrEnum


class WarningSeverity(StrEnum):
"""Enum class describing the warning severity."""

LOW = "low"
MEDIUM = "medium"
HIGH = "high"


class WarningCategory(StrEnum):
"""Enum class describing the warning category."""

USAGE = "USAGE"


class WarningVerbosity(StrEnum):
"""Enum class describing the warning verbosity."""

LOW = "low"
MEDIUM = "medium"
HIGH = "high"


class WarningConfig(BaseModel, use_enum_values=True):
"""Warning config class describing how warning messages should be displayed."""

description: str = Field(description="Description of the warning message")
severity: WarningSeverity = WarningSeverity.MEDIUM
category: WarningCategory
code: str
is_throttled: bool = Field(
description="If the warning is throttled. Throttled warnings should be displayed once per session.",
default=False,
)
verbosity: WarningVerbosity = Field(
description="Verbosity of the warning. Low verbosity displays basic details (code & message). Medium displays also call details like module & line number. High displays description and additional info.",
default=WarningVerbosity.MEDIUM,
)
168 changes: 168 additions & 0 deletions src/zenml/utils/warnings/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Copyright (c) ZenML GmbH 2025. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Module for centralized WarningController implementation."""

import logging
from collections import Counter
from typing import Any

from zenml.enums import LoggingLevels
from zenml.utils.singleton import SingletonMetaClass
from zenml.utils.warnings.base import WarningConfig, WarningVerbosity
from zenml.utils.warnings.registry import WarningCodes

logger = logging.getLogger(__name__)


class WarningController(metaclass=SingletonMetaClass):
"""Class responsible for centralized handling of warning messages."""

def __init__(self) -> None:
"""WarningController constructor."""
self._warning_configs: dict[str, WarningConfig] = {}
self._warning_statistics: dict[str, int] = Counter()

def register(self, warning_configs: dict[str, WarningConfig]) -> None:
"""Register a warning config collection to the controller.

Args:
warning_configs: Configs to be registered. Key should be the warning code.
"""
self._warning_configs.update(warning_configs)

@staticmethod
def _resolve_call_details() -> tuple[str, int]:
import inspect

frame = inspect.stack()[3] # public methods -> _log -> _resolve
module = inspect.getmodule(frame[0])
module_name = module.__name__ if module else "<unknown module>"
line_number = frame.lineno

return module_name, line_number

@staticmethod
def _get_display_message(
message: str,
module_name: str,
line_number: int,
config: WarningConfig,
) -> str:
"""Helper method to build the warning message string.

Args:
message: The warning message.
module_name: The module that the warning call originated from.
line_number: The line number that the warning call originated from.
config: The warning configuration.

Returns:
A warning message containing extra fields/info based on warning config.
"""
display = f"[{config.code}]({config.category}) - {message}"

if config.verbosity == WarningVerbosity.MEDIUM:
display = f"{module_name}:{line_number} {display}"

if config.verbosity == WarningVerbosity.HIGH:
display = f"{display}\n{config.description}"

return display

def _log(
self,
warning_code: WarningCodes,
message: str,
level: LoggingLevels,
**kwargs: dict[str, Any],
) -> None:
"""Core function for warning handling.

Args:
warning_code: The code of the warning configuration.
message: The warning message.
level: The level of the warning.
**kwargs: Keyword arguments (for formatted messages).

"""
warning_config = self._warning_configs.get(warning_code)

# resolves the module and line number of the warning call.
module_name, line_number = self._resolve_call_details()

if not warning_config:
# If no config is available just follow default behavior:
logger.warning(f"{module_name}:{line_number} - {message}")
return

if warning_config.is_throttled:
if warning_code in self._warning_statistics:
# Throttled code has already appeared - skip.
return

display_message = self._get_display_message(
message=message,
module_name=module_name,
line_number=line_number,
config=warning_config,
)

self._warning_statistics[warning_code] += 1

if level == LoggingLevels.INFO:
logger.info(display_message.format(**kwargs))
else:
# Assumes warning level is the default if an invalid option is passed.
logger.warning(display_message.format(**kwargs))

def warn(
self, *, warning_code: WarningCodes, message: str, **kwargs: Any
) -> None:
"""Method to execute warning handling logic with warning log level.

Args:
warning_code: The code of the warning (see WarningCodes enum)
message: The message to display.
**kwargs: Keyword arguments (for formatted messages).
"""
self._log(warning_code, message, LoggingLevels.WARNING, **kwargs)

def info(
self, *, warning_code: WarningCodes, message: str, **kwargs: Any
) -> None:
"""Method to execute warning handling logic with info log level.

Args:
warning_code: The code of the warning (see WarningCodes enum)
message: The message to display.
**kwargs: Keyword arguments (for formatted messages).
"""
self._log(warning_code, message, LoggingLevels.INFO, **kwargs)

@staticmethod
def create() -> "WarningController":
"""Factory function for WarningController.

Creates a new warning controller and registers system warning configs.

Returns:
A warning controller instance.
"""
from zenml.utils.warnings.registry import WARNING_CONFIG_REGISTRY

registry = WarningController()

registry.register(warning_configs=WARNING_CONFIG_REGISTRY)

return registry
Loading
Loading