Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/neptune_query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
resolve_files,
resolve_metrics_y,
resolve_sort_by,
validate_limit,
)
from neptune_query.exceptions import NeptuneUserError
from neptune_query.internal.composition import download_files as _download_files
Expand Down Expand Up @@ -488,7 +487,6 @@ def fetch_metric_buckets(
project_identifier = get_default_project_identifier(project)
experiments_filter = resolve_experiments_filter(experiments)
resolved_y = resolve_metrics_y(y)
validate_limit(limit, max_limit=1000)

return _fetch_metric_buckets.fetch_metric_buckets(
project_identifier=project_identifier,
Expand Down
12 changes: 1 addition & 11 deletions src/neptune_query/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import pandas as pd

from neptune_query import filters
from neptune_query.exceptions import (
NeptuneProjectNotProvided,
NeptuneUserError,
)
from neptune_query.exceptions import NeptuneProjectNotProvided
from neptune_query.internal import filters as _filters
from neptune_query.internal.context import get_context
from neptune_query.internal.identifiers import ProjectIdentifier
Expand Down Expand Up @@ -160,10 +157,3 @@ def resolve_destination_path(destination: Optional[Union[str, pathlib.Path]]) ->
return destination.resolve()
else:
return pathlib.Path(destination).resolve()


def validate_limit(limit: int, max_limit: int) -> None:
if limit <= 0:
raise NeptuneUserError(f"Limit must be a positive integer. Got: {limit}")
if limit > max_limit:
raise NeptuneUserError(f"Limit cannot be greater than {max_limit}. Got: {limit}")
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def fetch_metric_buckets(
container_type: ContainerType,
) -> pd.DataFrame:
validation.validate_metrics_x(x)
validation.validate_limit(limit)
validation.validate_bucket_limit(limit)
restricted_y = validation.restrict_attribute_filter_type(y, type_in={"float_series"})
limit = limit + 1 # we request one extra bucket because the 1st one is (-inf, 1st point] and we merge it

valid_context = validate_context(context or get_context())
client = get_client(context=valid_context)
Expand Down
9 changes: 9 additions & 0 deletions src/neptune_query/internal/composition/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ def validate_limit(limit: Optional[int]) -> None:
_validate_optional_positive_int(limit, "limit")


def validate_bucket_limit(limit: int) -> None:
if not isinstance(limit, int):
raise ValueError("limit must be an integer")
if limit <= 0:
raise ValueError(f"limit must be a positive integer. Got: {limit}")
if limit > 1000:
raise ValueError(f"limit cannot be greater than 1000. Got: {limit}")


def validate_metrics_x(x: Literal["step"]) -> Literal["step"]:
"""Validate that x is 'step' (the only valid value for now)."""
if x not in ("step",):
Expand Down
43 changes: 43 additions & 0 deletions src/neptune_query/internal/output_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,49 @@ def generate_categorized_rows() -> Generator[Tuple, None, None]:
df.index.name = None
df.columns.names = (container_column_name, "metric", "bucket")

df = _collapse_open_buckets(df)

return df


def _collapse_open_buckets(df: pd.DataFrame) -> pd.DataFrame:
"""
1st returned bucket is always (-inf, first_point], which we merge with the 2nd bucket (first_point, end],
resulting in a new bucket [first_point, end].
Comment on lines +497 to +498
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true for single point series, according to what you wrote earlier.
For single point series we get [first_point, inf)

"""
df.index = df.index.astype(object) # IntervalIndex cannot mix Intervals closed from different sides

if df.index.empty:
return df

if len(df.index) == 1:
finite_value = None
if np.isfinite(df.index[0].right) and not np.isfinite(df.index[0].left):
finite_value = df.index[0].right
elif np.isfinite(df.index[0].left) and not np.isfinite(df.index[0].right):
finite_value = df.index[0].left

if finite_value is not None:
new_interval = pd.Interval(left=finite_value, right=finite_value, closed="both")
df.index = pd.Index([new_interval], dtype=object)
return df

col_funcs = {
"x": lambda s: s[s.last_valid_index()] if s.last_valid_index() is not None else np.nan,
"y": lambda s: s[s.last_valid_index()] if s.last_valid_index() is not None else np.nan,
}

first, second = df.index[0], df.index[1]
if first.right >= second.left - second.length * 0.5: # floats can be imprecise, we use bucket length as a tolerance
new_interval = pd.Interval(left=first.right, right=second.right, closed="both")
new_row = df.iloc[0:2].apply(axis="index", func=lambda col: col_funcs[col.name[-1]](col))
df = df.drop(index=[first, second])
df.loc[new_interval] = new_row
df = df.sort_index()
else:
new_interval = pd.Interval(left=first.right, right=first.right + second.length, closed="both")
df.index = [new_interval] + list(df.index[1:])

return df


Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/v1/test_fetch_metric_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def create_expected_data_dict(
limit: int,
include_point_previews: bool, # TODO - add to the test data?
) -> pd.DataFrame:
bucket_ranges_x = _calculate_ranges_x(data, limit)
bucket_ranges_x = _calculate_ranges_x(data, limit + 1)

bucket_data: dict[RunAttributeDefinition, list[TimeseriesBucket]] = {}
for experiment_name, experiment_data in data.items():
Expand Down Expand Up @@ -238,7 +238,7 @@ def test__fetch_metric_buckets__filter_variants(

@pytest.mark.parametrize(
"limit",
[2, 3, 10, NUMBER_OF_STEPS + 10],
[1, 2, 3, 10, NUMBER_OF_STEPS + 10, 1000],
)
@pytest.mark.parametrize(
"include_point_previews",
Expand Down
109 changes: 92 additions & 17 deletions tests/unit/internal/test_output_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import numpy as np
import pandas as pd
import pytest
from pandas import Interval
from pandas._testing import assert_frame_equal

import neptune_query as npt
Expand Down Expand Up @@ -1358,12 +1359,8 @@ def test_create_empty_metric_buckets_dataframe():
)

# Then
expected_df = (
pd.DataFrame(data={"bucket": []}).astype(dtype={"bucket": "interval[float64, right]"}).set_index("bucket")
)
expected_df.columns = pd.MultiIndex.from_product(
[[], [], ["local_min", "local_max"]], names=["experiment", "metric", "bucket"]
)
expected_df = pd.DataFrame(data={"bucket": []}).astype(dtype={"bucket": "object"}).set_index("bucket")
expected_df.columns = pd.MultiIndex.from_product([[], [], ["x", "y"]], names=["experiment", "metric", "bucket"])
expected_df.index.name = None

pd.testing.assert_frame_equal(df, expected_df)
Expand All @@ -1384,7 +1381,7 @@ def test_create_metric_buckets_dataframe():
assert not df.empty, "DataFrame should not be empty"

# Check the shape of the DataFrame
num_expected_rows = BUCKETS
num_expected_rows = BUCKETS - 1
assert df.shape[0] == num_expected_rows, f"DataFrame should have {num_expected_rows} rows"

# Check the columns of the DataFrame
Expand All @@ -1403,6 +1400,77 @@ def test_create_metric_buckets_dataframe():
assert df.columns.get_level_values(2).nunique() == len(METRICS), f"DataFrame should have {METRICS} metrics"


@pytest.mark.parametrize(
"data,expected_df",
[
(
{
_generate_run_attribute_definition(experiment=1, path=1): [
_generate_bucket_metric(index=0),
]
},
pd.DataFrame(
{
("exp1", "path1", "x"): [20.0],
("exp1", "path1", "y"): [0.0],
},
index=pd.Index([Interval(20.0, 20.0, closed="both")], dtype="object"),
),
),
(
{
_generate_run_attribute_definition(experiment=1, path=1): [
_generate_bucket_metric(index=0),
_generate_bucket_metric(index=2),
]
},
pd.DataFrame(
{
("exp1", "path1", "x"): [20.0, 58.0],
("exp1", "path1", "y"): [0.0, 200.0],
},
index=pd.Index(
[Interval(20.0, 40.0, closed="both"), Interval(40.0, 60.0, closed="right")], dtype="object"
),
),
),
(
{
_generate_run_attribute_definition(experiment=1, path=1): [
_generate_bucket_metric(index=0),
_generate_bucket_metric(index=3),
]
},
pd.DataFrame(
{
("exp1", "path1", "x"): [20.0, 78.0],
("exp1", "path1", "y"): [0.0, 300.0],
},
index=pd.Index(
[Interval(20.0, 40.0, closed="both"), Interval(60.0, 80.0, closed="right")], dtype="object"
),
),
),
],
)
def test_create_metric_buckets_dataframe_parametrized(data, expected_df):
# Given
sys_id_label_mapping = {
SysId("sysid1"): "exp1",
}
expected_df.columns.names = ["experiment", "metric", "bucket"]

# When
df = create_metric_buckets_dataframe(
buckets_data=data,
sys_id_label_mapping=sys_id_label_mapping,
container_column_name="experiment",
)

# Then
pd.testing.assert_frame_equal(df, expected_df)


def test_create_metric_buckets_dataframe_missing_values():
# Given
data = {
Expand Down Expand Up @@ -1432,17 +1500,17 @@ def test_create_metric_buckets_dataframe_missing_values():

# Then
expected = {
("exp1", "path1", "x"): [20.0, 38.0, np.nan],
("exp1", "path1", "y"): [0.0, 100.0, np.nan],
("exp1", "path2", "x"): [np.nan, 38.0, 58.0],
("exp1", "path2", "y"): [np.nan, 100.0, 200.0],
("exp2", "path1", "x"): [20.0, np.nan, 58.0],
("exp2", "path1", "y"): [0.0, np.nan, 200.00],
("exp1", "path1", "x"): [38.0, np.nan],
("exp1", "path1", "y"): [100.0, np.nan],
("exp1", "path2", "x"): [38.0, 58.0],
("exp1", "path2", "y"): [100.0, 200.0],
("exp2", "path1", "x"): [20.0, 58.0],
("exp2", "path1", "y"): [0.0, 200.00],
}

expected_df = pd.DataFrame(
dict(sorted(expected.items())),
index=pd.IntervalIndex.from_tuples([(float("-inf"), 20.0), (20.0, 40.0), (40.0, 60.0)]),
index=pd.Index([Interval(20.0, 40.0, closed="both"), Interval(40.0, 60.0, closed="right")]),
)
expected_df.columns.names = ["experiment", "metric", "bucket"]

Expand All @@ -1456,6 +1524,7 @@ def test_create_metric_buckets_dataframe_sorted():
_generate_bucket_metric(index=2),
_generate_bucket_metric(index=0),
_generate_bucket_metric(index=1),
_generate_bucket_metric(index=3),
],
}
sys_id_label_mapping = {
Expand All @@ -1470,13 +1539,19 @@ def test_create_metric_buckets_dataframe_sorted():

# Then
expected = {
("exp1", "path1", "x"): [20.0, 38.0, 58.0],
("exp1", "path1", "y"): [0.0, 100.0, 200.0],
("exp1", "path1", "x"): [38.0, 58.0, 78.0],
("exp1", "path1", "y"): [100.0, 200.0, 300.0],
}

expected_df = pd.DataFrame(
dict(sorted(expected.items())),
index=pd.IntervalIndex.from_tuples([(float("-inf"), 20.0), (20.0, 40.0), (40.0, 60.0)]),
index=pd.Index(
[
Interval(20.0, 40.0, closed="both"),
Interval(40.0, 60.0, closed="right"),
Interval(60.0, 80.0, closed="right"),
]
),
)
expected_df.columns.names = ["experiment", "metric", "bucket"]

Expand Down
Loading