diff --git a/src/neptune_query/internal/composition/type_inference.py b/src/neptune_query/internal/composition/type_inference.py index b580f1b5..61accf1b 100644 --- a/src/neptune_query/internal/composition/type_inference.py +++ b/src/neptune_query/internal/composition/type_inference.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import copy -import warnings from collections import defaultdict from concurrent.futures import Executor from dataclasses import dataclass @@ -28,6 +27,7 @@ from neptune_api.client import AuthenticatedClient from ...exceptions import AttributeTypeInferenceError +from ...warnings import AttributeWarning from .. import ( filters, identifiers, @@ -39,6 +39,7 @@ HISTOGRAM_SERIES_AGGREGATIONS, STRING_SERIES_AGGREGATIONS, ) +from ..warnings import throttled_warn from .attributes import fetch_attribute_definitions T = TypeVar("T", bound=Union[filters._Filter, filters._Attribute, None]) @@ -155,9 +156,10 @@ def get_result_or_raise(self) -> T: def emit_warnings(self) -> None: for attr_state in self.attributes: if attr_state.warning_text: - msg = f"Attribute '{attr_state.original_attribute.name}': {attr_state.warning_text}" - # TODO: Add category to warnings.py - warnings.warn(msg, stacklevel=3) + throttled_warn( + AttributeWarning(f"Attribute '{attr_state.original_attribute.name}': {attr_state.warning_text}"), + stacklevel=3, + ) def infer_attribute_types_in_filter( diff --git a/src/neptune_query/internal/experimental.py b/src/neptune_query/internal/experimental.py index 4868791d..1890cc38 100644 --- a/src/neptune_query/internal/experimental.py +++ b/src/neptune_query/internal/experimental.py @@ -14,18 +14,15 @@ # limitations under the License. import functools -import warnings from typing import ( Callable, ParamSpec, TypeVar, ) +from neptune_query.internal.warnings import throttled_warn from neptune_query.warnings import ExperimentalWarning -# registry of functions already warned -_warned_experimentals = set() - T = ParamSpec("T") R = TypeVar("R") @@ -38,14 +35,13 @@ def experimental(func: Callable[T, R]) -> Callable[T, R]: @functools.wraps(func) def wrapper(*args: T.args, **kwargs: T.kwargs) -> R: - if func not in _warned_experimentals: - warnings.warn( - f"{func.__qualname__} is experimental and might change or be removed " - "in a future minor release. Use with caution in production code.", - category=ExperimentalWarning, - stacklevel=2, - ) - _warned_experimentals.add(func) + throttled_warn( + ExperimentalWarning( + f"`{func.__module__}.{func.__qualname__}` is experimental and might change or be removed " + "in a future minor release. Use with caution in production code." + ), + stacklevel=3, + ) return func(*args, **kwargs) return wrapper diff --git a/src/neptune_query/internal/retrieval/retry.py b/src/neptune_query/internal/retrieval/retry.py index f8d7ba61..294216be 100644 --- a/src/neptune_query/internal/retrieval/retry.py +++ b/src/neptune_query/internal/retrieval/retry.py @@ -32,8 +32,15 @@ from neptune_api.types import Response from ... import exceptions +from ...warnings import ( + Http5xxWarning, + Http429Warning, + Http503Warning, + HttpOtherWarning, +) from .. import env from ..logger import get_logger +from ..warnings import throttled_warn logger = get_logger() @@ -87,7 +94,6 @@ def wrapper(*args: T.args, **kwargs: T.kwargs) -> Response[R]: response = None try: response = func(*args, **kwargs) - if 200 <= response.status_code.value < 300: return response except exceptions.NeptuneError: @@ -111,12 +117,44 @@ def wrapper(*args: T.args, **kwargs: T.kwargs) -> Response[R]: f"Neptune API request was rate limited. Retry-after header value: {sleep_time} seconds. " f"Total time spent on rate limiting so far: {rate_limit_time_extension} seconds." ) + throttled_warn( + Http429Warning( + "Neptune API request was rate limited. We will slow down and retry automatically." + ), + stacklevel=4, + ) + elif response is not None and response.status_code.value == 503: + sleep_time = backoff_strategy(backoff_tries) + logger.debug( + f"Neptune API request failed. Backoff strategy recommends backing off for {sleep_time:.2f} " + f"seconds. Response: {response}. Last exception: {last_exc}." + ) + throttled_warn( + Http503Warning("Neptune API is temporarily unavailable. We will retry automatically."), + stacklevel=4, + ) + elif response is not None and 500 <= response.status_code.value < 600: + sleep_time = backoff_strategy(backoff_tries) + logger.debug( + f"Neptune API request failed. Backoff strategy recommends backing off for {sleep_time:.2f} " + f"seconds. Response: {response}. Last exception: {last_exc}." + ) + throttled_warn( + Http5xxWarning( + f"Neptune API request failed with HTTP code {response.status_code.value}. " + "We will retry automatically." + ), + stacklevel=4, + ) else: sleep_time = backoff_strategy(backoff_tries) logger.debug( f"Neptune API request failed. Backoff strategy recommends backing off for {sleep_time:.2f} " f"seconds. Response: {response}. Last exception: {last_exc}." ) + throttled_warn( + HttpOtherWarning("Neptune API request failed. We will retry automatically"), stacklevel=4 + ) elapsed_time = time.monotonic() - start_time diff --git a/src/neptune_query/internal/warnings.py b/src/neptune_query/internal/warnings.py new file mode 100644 index 00000000..79b9ab25 --- /dev/null +++ b/src/neptune_query/internal/warnings.py @@ -0,0 +1,66 @@ +# +# Copyright (c) 2025, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +import warnings +from datetime import ( + datetime, + timedelta, +) +from typing import Type + +from neptune_query.warnings import ( + ExperimentalWarning, + Http5xxWarning, + Http429Warning, + HttpOtherWarning, +) + +# registry of warnings that were already emitted with the (type, message) tuple +WARNING_TYPES_EMITTED_ONCE_PER_MESSAGE = (ExperimentalWarning,) + +# registry of warning types that were already emitted with the time they should be silenced until +WARNING_TYPES_EMITTED_ONCE_PER_WHILE = (Http429Warning, Http5xxWarning, HttpOtherWarning) +WARNING_SUPPRESSION_DURATION = timedelta(seconds=20) + +_silence_warnings_msg: set[tuple[Type[Warning], str]] = set() +_silence_warnings_until: dict[Type[Warning], datetime] = {} + + +def format_warning(warning: Warning) -> Warning: + # check if stderr is a terminal: + if sys.stderr.isatty(): + orange_bold = "\033[1;38;2;255;165;0m" + end = "\033[0m" + msg = f"{orange_bold}{str(warning)}{end}" + return type(warning)(msg) + else: + return warning + + +def throttled_warn(warning: Warning, stacklevel: int = 3) -> None: + if isinstance(warning, WARNING_TYPES_EMITTED_ONCE_PER_MESSAGE): + key = (type(warning), str(warning)) + if key in _silence_warnings_msg: + return + _silence_warnings_msg.add(key) + + if isinstance(warning, WARNING_TYPES_EMITTED_ONCE_PER_WHILE): + warning_type = type(warning) + now = datetime.now() + if warning_type in _silence_warnings_until and _silence_warnings_until[warning_type] > now: + return + _silence_warnings_until[warning_type] = now + WARNING_SUPPRESSION_DURATION + + warnings.warn(format_warning(warning), stacklevel=stacklevel) diff --git a/src/neptune_query/warnings.py b/src/neptune_query/warnings.py index da0e6588..026bf592 100644 --- a/src/neptune_query/warnings.py +++ b/src/neptune_query/warnings.py @@ -14,5 +14,25 @@ # limitations under the License. +class AttributeWarning(UserWarning): + """Warning for attribute issues.""" + + class ExperimentalWarning(UserWarning): """Warning for use of experimental API elements.""" + + +class Http429Warning(UserWarning): + """Warning for retryable HTTP 429 responses (rate limiting).""" + + +class Http503Warning(UserWarning): + """Warning for retryable HTTP 503 responses (service unavailable).""" + + +class Http5xxWarning(UserWarning): + """Warning for retryable HTTP 5xx responses (server errors).""" + + +class HttpOtherWarning(UserWarning): + """Warning for other retryable HTTP issues.""" diff --git a/tests/unit/internal/test_warnings.py b/tests/unit/internal/test_warnings.py new file mode 100644 index 00000000..55abd650 --- /dev/null +++ b/tests/unit/internal/test_warnings.py @@ -0,0 +1,161 @@ +# +# Copyright (c) 2025, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from unittest.mock import patch + +import pytest + +from neptune_query.internal.warnings import ( + _silence_warnings_msg, + _silence_warnings_until, + throttled_warn, +) +from neptune_query.warnings import ( + ExperimentalWarning, + Http5xxWarning, + Http429Warning, +) + +TIME_00_01 = datetime(2025, 9, 11, 15, 0, 1) +TIME_00_03 = datetime(2025, 9, 11, 15, 0, 3) +TIME_00_07 = datetime(2025, 9, 11, 15, 0, 7) +TIME_00_10 = datetime(2025, 9, 11, 15, 0, 10) +TIME_00_18 = datetime(2025, 9, 11, 15, 0, 18) +TIME_00_22 = datetime(2025, 9, 11, 15, 0, 22) +TIME_00_25 = datetime(2025, 9, 11, 15, 0, 25) + + +@pytest.fixture(autouse=True) +def clear_warning_registries(): + # Clear the warning registries before each test + _silence_warnings_msg.clear() + _silence_warnings_until.clear() + + +def emit_warning_at_time(time, warning): + """Helper function to emit a warning at a specific time by mocking INITIAL_TIME""" + with patch("neptune_query.internal.warnings.datetime") as mock_datetime: + mock_datetime.now.return_value = time + throttled_warn(warning) + + +@patch("neptune_query.internal.warnings.warnings.warn") +def test_regular_warning(mock_warn): + """Regular warnings should be emitted every time""" + warning = Warning("Test warning") + emit_warning_at_time(TIME_00_01, warning) + mock_warn.assert_called_once_with(warning, stacklevel=3) + + # Second emission should also go through + emit_warning_at_time(TIME_00_01, warning) + assert mock_warn.call_count == 2 + + +@patch("neptune_query.internal.warnings.warnings.warn") +def test_experimental_warning_once_per_message(mock_warn): + """ExperimentalWarning with the same message should be emitted only once""" + warning = ExperimentalWarning("Test experimental feature") + different_warning = ExperimentalWarning("Different message") + + # First emission should go through + emit_warning_at_time(TIME_00_01, warning) + mock_warn.assert_called_once_with(warning, stacklevel=3) + + # Second emission with same message should be suppressed + emit_warning_at_time(TIME_00_03, warning) + assert mock_warn.call_count == 1 + + # Different message should go through + emit_warning_at_time(TIME_00_03, different_warning) + assert mock_warn.call_count == 2 + + # Emitting the same warnings again should still be suppressed + emit_warning_at_time(TIME_00_10, warning) + emit_warning_at_time(TIME_00_10, different_warning) + assert mock_warn.call_count == 2 + + +@patch("neptune_query.internal.warnings.warnings.warn") +def test_http429_warning_once_per_20s(mock_warn): + """Http429Warning should be emitted once per minute""" + warning = Http429Warning("Rate limit exceeded") + different_warning = Http429Warning("Another rate limit message") + non_throttled_warning = Warning("Non-throttled warning") + + # First emission should go through + emit_warning_at_time(TIME_00_01, warning) + mock_warn.assert_called_once_with(warning, stacklevel=3) + + # Further emissions within seconds should be suppressed + emit_warning_at_time(TIME_00_03, warning) # 2s after first emission + emit_warning_at_time(TIME_00_07, warning) # 6s after first emission + emit_warning_at_time(TIME_00_18, warning) # 17s after first emission + assert mock_warn.call_count == 1 + + # Non-throttled warning should go through + emit_warning_at_time(TIME_00_07, non_throttled_warning) + emit_warning_at_time(TIME_00_10, non_throttled_warning) + assert mock_warn.call_count == 3 + + # Different message but same type should also be suppressed + emit_warning_at_time(TIME_00_18, different_warning) + assert mock_warn.call_count == 3 + + # After 21 seconds since first emission, it should go through again + emit_warning_at_time(TIME_00_22, warning) + assert mock_warn.call_count == 4 + + +@patch("neptune_query.internal.warnings.warnings.warn") +def test_http429_and_http5xx_warnings(mock_warn): + """Http429Warning and Http5xxWarning should be throttled independently""" + warning_429 = Http429Warning("Rate limit exceeded") + warning_5xx = Http5xxWarning("Server error") + non_throttled_warning = Warning("Non-throttled warning") + + # Emit Http429Warning + emit_warning_at_time(TIME_00_01, warning_429) + assert mock_warn.call_count == 1 + mock_warn.assert_called_once_with(warning_429, stacklevel=3) + + # Emit Http5xxWarning 2 seconds later + emit_warning_at_time(TIME_00_03, warning_5xx) + assert mock_warn.call_count == 2 + mock_warn.assert_called_with(warning_5xx, stacklevel=3) + + # Further emissions within 20 seconds should be suppressed + emit_warning_at_time(TIME_00_07, warning_429) # 6s after first warning_429 + emit_warning_at_time(TIME_00_10, warning_5xx) # 7s after first warning_5xx + assert mock_warn.call_count == 2 + + # Non-throttled warning should go through + emit_warning_at_time(TIME_00_10, non_throttled_warning) + emit_warning_at_time(TIME_00_18, non_throttled_warning) + assert mock_warn.call_count == 4 + + # After 21 seconds since first emission, warning_429 should go through again + emit_warning_at_time(TIME_00_22, warning_429) + assert mock_warn.call_count == 5 + mock_warn.assert_called_with(warning_429, stacklevel=3) + + # Time for 5xx warnings is not yet up + emit_warning_at_time(TIME_00_22, warning_5xx) + assert mock_warn.call_count == 5 + + # After another 3 seconds (total 22s from first 5xx warning), 5xx warning should go through + emit_warning_at_time(TIME_00_25, warning_5xx) + assert mock_warn.call_count == 6 + mock_warn.assert_called_with(warning_5xx, stacklevel=3)