Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/neptune_query/internal/composition/type_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import warnings
from collections import defaultdict
from concurrent.futures import Executor
from dataclasses import dataclass
Expand All @@ -28,6 +27,7 @@
from neptune_api.client import AuthenticatedClient

from ...exceptions import AttributeTypeInferenceError
from ...warnings import AttributeWarning
from .. import (
filters,
identifiers,
Expand All @@ -39,6 +39,7 @@
HISTOGRAM_SERIES_AGGREGATIONS,
STRING_SERIES_AGGREGATIONS,
)
from ..warnings import throttled_warn
from .attributes import fetch_attribute_definitions

T = TypeVar("T", bound=Union[filters._Filter, filters._Attribute, None])
Expand Down Expand Up @@ -155,9 +156,10 @@ def get_result_or_raise(self) -> T:
def emit_warnings(self) -> None:
for attr_state in self.attributes:
if attr_state.warning_text:
msg = f"Attribute '{attr_state.original_attribute.name}': {attr_state.warning_text}"
# TODO: Add category to warnings.py
warnings.warn(msg, stacklevel=3)
throttled_warn(
AttributeWarning(f"Attribute '{attr_state.original_attribute.name}': {attr_state.warning_text}"),
stacklevel=3,
)


def infer_attribute_types_in_filter(
Expand Down
20 changes: 8 additions & 12 deletions src/neptune_query/internal/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@
# limitations under the License.

import functools
import warnings
from typing import (
Callable,
ParamSpec,
TypeVar,
)

from neptune_query.internal.warnings import throttled_warn
from neptune_query.warnings import ExperimentalWarning

# registry of functions already warned
_warned_experimentals = set()

T = ParamSpec("T")
R = TypeVar("R")

Expand All @@ -38,14 +35,13 @@ def experimental(func: Callable[T, R]) -> Callable[T, R]:

@functools.wraps(func)
def wrapper(*args: T.args, **kwargs: T.kwargs) -> R:
if func not in _warned_experimentals:
warnings.warn(
f"{func.__qualname__} is experimental and might change or be removed "
"in a future minor release. Use with caution in production code.",
category=ExperimentalWarning,
stacklevel=2,
)
_warned_experimentals.add(func)
throttled_warn(
ExperimentalWarning(
f"`{func.__module__}.{func.__qualname__}` is experimental and might change or be removed "
"in a future minor release. Use with caution in production code."
),
stacklevel=3,
)
return func(*args, **kwargs)

return wrapper
40 changes: 39 additions & 1 deletion src/neptune_query/internal/retrieval/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@
from neptune_api.types import Response

from ... import exceptions
from ...warnings import (
Http5xxWarning,
Http429Warning,
Http503Warning,
HttpOtherWarning,
)
from .. import env
from ..logger import get_logger
from ..warnings import throttled_warn

logger = get_logger()

Expand Down Expand Up @@ -87,7 +94,6 @@ def wrapper(*args: T.args, **kwargs: T.kwargs) -> Response[R]:
response = None
try:
response = func(*args, **kwargs)

if 200 <= response.status_code.value < 300:
return response
except exceptions.NeptuneError:
Expand All @@ -111,12 +117,44 @@ def wrapper(*args: T.args, **kwargs: T.kwargs) -> Response[R]:
f"Neptune API request was rate limited. Retry-after header value: {sleep_time} seconds. "
f"Total time spent on rate limiting so far: {rate_limit_time_extension} seconds."
)
throttled_warn(
Http429Warning(
"Neptune API request was rate limited. We will slow down and retry automatically."
),
stacklevel=4,
)
elif response is not None and response.status_code.value == 503:
sleep_time = backoff_strategy(backoff_tries)
logger.debug(
f"Neptune API request failed. Backoff strategy recommends backing off for {sleep_time:.2f} "
f"seconds. Response: {response}. Last exception: {last_exc}."
)
throttled_warn(
Http503Warning("Neptune API is temporarily unavailable. We will retry automatically."),
stacklevel=4,
)
elif response is not None and 500 <= response.status_code.value < 600:
sleep_time = backoff_strategy(backoff_tries)
logger.debug(
f"Neptune API request failed. Backoff strategy recommends backing off for {sleep_time:.2f} "
f"seconds. Response: {response}. Last exception: {last_exc}."
)
throttled_warn(
Http5xxWarning(
f"Neptune API request failed with HTTP code {response.status_code.value}. "
"We will retry automatically."
),
stacklevel=4,
)
else:
sleep_time = backoff_strategy(backoff_tries)
logger.debug(
f"Neptune API request failed. Backoff strategy recommends backing off for {sleep_time:.2f} "
f"seconds. Response: {response}. Last exception: {last_exc}."
)
throttled_warn(
HttpOtherWarning("Neptune API request failed. We will retry automatically"), stacklevel=4
)

elapsed_time = time.monotonic() - start_time

Expand Down
72 changes: 72 additions & 0 deletions src/neptune_query/internal/warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Copyright (c) 2025, Neptune Labs Sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import warnings
from datetime import (
datetime,
timedelta,
)
from typing import Type

from neptune_query.warnings import (
ExperimentalWarning,
Http5xxWarning,
Http429Warning,
HttpOtherWarning,
)

# registry of warnings that were already emitted with the (type, message) tuple
WARNING_TYPES_EMITTED_ONCE_PER_MESSAGE = (ExperimentalWarning,)

# registry of warning types that were already emitted with the time they should be silenced until
WARNING_TYPES_EMITTED_ONCE_PER_WHILE = (Http429Warning, Http5xxWarning, HttpOtherWarning)
WARNING_SUPPRESSION_DURATION = timedelta(seconds=20)

_silence_warnings_msg: set[tuple[Type[Warning], str]] = set()
_silence_warnings_until: dict[Type[Warning], datetime] = {}


def get_thread_id() -> int:
import threading

return threading.get_ident() % 123


def format_warning(warning: Warning) -> Warning:
# check if stderr is a terminal:
if sys.stderr.isatty():
orange_bold = "\033[1;38;2;255;165;0m"
end = "\033[0m"
msg = f"{orange_bold}{str(warning)}{end}"
return type(warning)(msg)
else:
return warning


def throttled_warn(warning: Warning, stacklevel: int = 3) -> None:
if isinstance(warning, WARNING_TYPES_EMITTED_ONCE_PER_MESSAGE):
key = (type(warning), str(warning))
if key in _silence_warnings_msg:
return
_silence_warnings_msg.add(key)

if isinstance(warning, WARNING_TYPES_EMITTED_ONCE_PER_WHILE):
warning_type = type(warning)
now = datetime.now()
if warning_type in _silence_warnings_until and _silence_warnings_until[warning_type] > now:
return
_silence_warnings_until[warning_type] = now + WARNING_SUPPRESSION_DURATION

warnings.warn(format_warning(warning), stacklevel=stacklevel)
20 changes: 20 additions & 0 deletions src/neptune_query/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,25 @@
# limitations under the License.


class AttributeWarning(UserWarning):
"""Warning for attribute issues."""


class ExperimentalWarning(UserWarning):
"""Warning for use of experimental API elements."""


class Http429Warning(UserWarning):
"""Warning for retryable HTTP 429 responses (rate limiting)."""


class Http503Warning(UserWarning):
"""Warning for retryable HTTP 503 responses (service unavailable)."""


class Http5xxWarning(UserWarning):
"""Warning for retryable HTTP 5xx responses (server errors)."""


class HttpOtherWarning(UserWarning):
"""Warning for other retryable HTTP issues."""
Loading
Loading