Skip to content

Commit 9ee2fcb

Browse files
committed
feat: add x_range to fetch_time_series_buckets
1 parent e7ad623 commit 9ee2fcb

File tree

3 files changed

+164
-3
lines changed

3 files changed

+164
-3
lines changed

src/neptune_query/internal/composition/fetch_metric_buckets.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ def go_fetch_sys_attrs() -> Generator[list[identifiers.SysId], None, None]:
170170
include_point_previews=include_point_previews,
171171
limit=limit,
172172
container_type=container_type,
173+
x_range=None
173174
)
174175

175176
return buckets_data, sys_id_label_mapping

src/neptune_query/internal/retrieval/metric_buckets.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def fetch_time_series_buckets(
9292
lineage_to_the_root: bool,
9393
include_point_previews: bool,
9494
limit: int,
95+
x_range: Optional[tuple[float, float]],
9596
) -> dict[RunAttributeDefinition, list[TimeseriesBucket]]:
9697
run_attribute_definitions = list(run_attribute_definitions)
9798

@@ -128,14 +129,15 @@ def fetch_time_series_buckets(
128129
)
129130

130131
view = ProtoView(
131-
# from=0.0,
132-
# to=1.0,
133-
# pointFilters=ProtoPointFilters(),
134132
maxBuckets=limit,
135133
xScale=ProtoScale.linear,
136134
yScale=ProtoScale.linear,
137135
xAxis=xAxis,
138136
)
137+
if x_range is not None:
138+
x_from, x_to = x_range
139+
view.to = x_to
140+
setattr(view, "from", x_from) # from is a reserved keyword in Python
139141

140142
request_object = ProtoGetTimeseriesBucketsRequest(
141143
expressions=expressions.values(),
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import numpy as np
2+
import pytest
3+
4+
from neptune_query.internal.identifiers import (
5+
AttributeDefinition,
6+
RunAttributeDefinition,
7+
)
8+
from neptune_query.internal.retrieval.metric_buckets import (
9+
TimeseriesBucket,
10+
fetch_time_series_buckets,
11+
)
12+
from neptune_query.internal.retrieval.search import ContainerType
13+
from tests.e2e.data import (
14+
FLOAT_SERIES_PATHS,
15+
PATH,
16+
TEST_DATA,
17+
)
18+
19+
EXPERIMENT = TEST_DATA.experiments[0]
20+
21+
22+
def test_fetch_time_series_buckets_does_not_exist(client, project, experiment_identifier):
23+
# given
24+
run_definition = RunAttributeDefinition(experiment_identifier, AttributeDefinition("does-not-exist", "string"))
25+
26+
# when
27+
result = fetch_time_series_buckets(
28+
client,
29+
run_attribute_definitions=[run_definition],
30+
container_type=ContainerType.EXPERIMENT,
31+
x="step",
32+
lineage_to_the_root=False,
33+
include_point_previews=False,
34+
limit=10,
35+
x_range=None,
36+
)
37+
38+
# then
39+
assert result == {run_definition: []}
40+
41+
42+
def _calculate_bucket_ranges(
43+
series: list[tuple[float, float]], limit: int, x_range: tuple[float, float] | None
44+
) -> list[TimeseriesBucket]:
45+
if x_range is not None:
46+
range_from, range_to = x_range
47+
else:
48+
xs = [x for x, y in series]
49+
range_from, range_to = min(xs), max(xs)
50+
51+
bucket_ranges = []
52+
bucket_width = (range_to - range_from) / (limit - 1)
53+
for bucket_i in range(limit + 1):
54+
if bucket_i == 0:
55+
from_x = float("-inf")
56+
else:
57+
from_x = range_from + bucket_width * (bucket_i - 1)
58+
59+
if bucket_i == limit:
60+
to_x = float("inf")
61+
else:
62+
to_x = range_from + bucket_width * bucket_i
63+
bucket_ranges.append((from_x, to_x))
64+
return bucket_ranges
65+
66+
67+
def _aggregate_buckets(
68+
series: list[tuple[float, float]], limit: int, x_range: tuple[float, float] | None
69+
) -> list[TimeseriesBucket]:
70+
bucket_ranges = _calculate_bucket_ranges(series, limit, x_range)
71+
72+
buckets = []
73+
for bucket_i, bucket_x_range in enumerate(bucket_ranges):
74+
from_x, to_x = bucket_x_range
75+
76+
count = 0
77+
positive_inf_count = 0
78+
negative_inf_count = 0
79+
nan_count = 0
80+
xs = []
81+
ys = []
82+
for x, y in series:
83+
if from_x < x <= to_x or (bucket_i == 0 and x == from_x):
84+
count += 1
85+
if np.isposinf(y):
86+
positive_inf_count += 1
87+
elif np.isneginf(y):
88+
negative_inf_count += 1
89+
elif np.isnan(y):
90+
nan_count += 1
91+
else:
92+
xs.append(x)
93+
ys.append(y)
94+
if count == 0:
95+
continue
96+
97+
bucket = TimeseriesBucket(
98+
index=bucket_i,
99+
from_x=from_x,
100+
to_x=to_x,
101+
first_x=xs[0] if xs else float("nan"),
102+
first_y=ys[0] if ys else float("nan"),
103+
last_x=xs[-1] if xs else float("nan"),
104+
last_y=ys[-1] if ys else float("nan"),
105+
y_min=float(np.min(ys)) if ys else float("nan"),
106+
y_max=float(np.max(ys)) if ys else float("nan"),
107+
finite_point_count=len(ys),
108+
nan_count=nan_count,
109+
positive_inf_count=positive_inf_count,
110+
negative_inf_count=negative_inf_count,
111+
finite_points_sum=float(np.sum(ys)) if ys else 0.0,
112+
)
113+
buckets.append(bucket)
114+
return buckets
115+
116+
117+
@pytest.mark.parametrize(
118+
"attribute_name, expected_values",
119+
[
120+
(
121+
FLOAT_SERIES_PATHS[0],
122+
list(zip(EXPERIMENT.float_series[f"{PATH}/metrics/step"], EXPERIMENT.float_series[FLOAT_SERIES_PATHS[0]])),
123+
),
124+
],
125+
)
126+
@pytest.mark.parametrize(
127+
"limit",
128+
[2, 10, 100],
129+
)
130+
@pytest.mark.parametrize(
131+
"x_range",
132+
[None, (1, 2), (-100, 100)],
133+
)
134+
def test_fetch_time_series_buckets_single_series(
135+
client, project, experiment_identifier, attribute_name, expected_values, limit, x_range
136+
):
137+
# given
138+
run_definition = RunAttributeDefinition(experiment_identifier, AttributeDefinition(attribute_name, "float-series"))
139+
140+
# when
141+
result = fetch_time_series_buckets(
142+
client,
143+
run_attribute_definitions=[run_definition],
144+
container_type=ContainerType.EXPERIMENT,
145+
x="step",
146+
lineage_to_the_root=False,
147+
include_point_previews=False,
148+
limit=limit,
149+
x_range=x_range,
150+
)
151+
152+
print()
153+
print(f"{limit=}, {x_range=}:")
154+
print("; ".join([f"({b.from_x},{b.to_x}] count={b.finite_point_count}" for b in result[run_definition]]))
155+
156+
# then
157+
expected_buckets = _aggregate_buckets(expected_values, limit, x_range)
158+
assert result == {run_definition: expected_buckets}

0 commit comments

Comments
 (0)