Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pattern = "default-unprefixed"
python = "^3.10"

# Base neptune package
neptune-api = ">=0.20.0,<0.22.0"
neptune-api = ">=0.22.0,<0.23.0"
azure-storage-blob = "^12.7.0"
pandas = ">=1.4.0"

Expand Down
76 changes: 76 additions & 0 deletions src/neptune_query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"list_attributes",
"fetch_experiments_table",
"fetch_metrics",
"fetch_metric_buckets",
"fetch_series",
"download_files",
]
Expand All @@ -45,16 +46,19 @@
resolve_destination_path,
resolve_experiments_filter,
resolve_files,
resolve_metrics_y,
resolve_sort_by,
)
from neptune_query.exceptions import NeptuneUserError
from neptune_query.internal.composition import download_files as _download_files
from neptune_query.internal.composition import fetch_metric_buckets as _fetch_metric_buckets
from neptune_query.internal.composition import fetch_metrics as _fetch_metrics
from neptune_query.internal.composition import fetch_series as _fetch_series
from neptune_query.internal.composition import fetch_table as _fetch_table
from neptune_query.internal.composition import list_attributes as _list_attributes
from neptune_query.internal.composition import list_containers as _list_containers
from neptune_query.internal.context import set_api_token
from neptune_query.internal.experimental import experimental
from neptune_query.internal.query_metadata_context import use_query_metadata
from neptune_query.internal.retrieval import search as _search

Expand Down Expand Up @@ -424,3 +428,75 @@ def download_files(
destination=destination_path,
container_type=_search.ContainerType.EXPERIMENT,
)


@experimental
@use_query_metadata(api_function="fetch_metric_buckets")
def fetch_metric_buckets(
*,
project: Optional[str] = None,
experiments: Union[str, list[str], filters.Filter],
x: Union[Literal["step"]] = "step",
y: Union[str, list[str], filters.AttributeFilter],
limit: int = 1000,
lineage_to_the_root: bool = True,
include_point_previews: bool = False,
) -> _pandas.DataFrame:
"""Fetches a table of metric values split by X-axis buckets.

**Caution:** This function is experimental and might be changed or removed in a future minor release.
Use with caution in production code.

One point is returned from each bucket. To control the number of buckets, use the `limit` parameter.

Both the first and last points of each metric are always included:
- For every first bucket of a given series, the first point is returned.
- For the remaining buckets, the last point is returned.

Args:
project: Path of the Neptune project, as `WorkspaceName/ProjectName`.
If not provided, the NEPTUNE_PROJECT environment variable is used.
experiments: Filter specifying which experiments to include.
If a string is provided, it's treated as a regex pattern that the names must match.
If a list of strings is provided, it's treated as exact experiment names to match.
To provide a more complex condition on an arbitrary attribute value, pass a Filter object.
x: The X-axis series used for the bucketing. Only "step" is currently supported.
y: Filter specifying which metrics to include.
If a string is provided, it's treated as a regex pattern that the metric names must match.
If a list of strings is provided, it's treated as exact metric names to match.
To provide a more complex condition, pass an AttributeFilter object.
limit: Number of buckets to use. The default and maximum value is 1000.
lineage_to_the_root: If True (default), includes all values from the complete experiment history.
If False, only includes values from the most recent experiment in the lineage.
include_point_previews: If False (default), the returned results only contain committed
points. If True, the results also include preview points and the returned DataFrame will
have additional sub-columns with preview status: is_preview and preview_completion.

Example:
From two specific experiments, fetch training losses split into 10 buckets:
```
import neptune_query as nq


nq.fetch_metric_buckets(
experiments=["seagull-week1", "seagull-week2"],
x="step",
y=r"^train/loss",
limit=10, # Only 10 buckets for broad trends
)
```
"""
project_identifier = get_default_project_identifier(project)
experiments_filter = resolve_experiments_filter(experiments)
resolved_y = resolve_metrics_y(y)

return _fetch_metric_buckets.fetch_metric_buckets(
project_identifier=project_identifier,
filter_=experiments_filter,
x=x,
y=resolved_y,
limit=limit,
lineage_to_the_root=lineage_to_the_root,
include_point_previews=include_point_previews,
container_type=_search.ContainerType.EXPERIMENT,
)
6 changes: 6 additions & 0 deletions src/neptune_query/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ def resolve_attributes_filter(
)


def resolve_metrics_y(
attributes: Optional[Union[str, list[str], filters.AttributeFilter]],
) -> _filters._BaseAttributeFilter:
return resolve_attributes_filter(attributes)


def resolve_sort_by(sort_by: Union[str, filters.Attribute]) -> _filters._Attribute:
if isinstance(sort_by, str):
return filters.Attribute(sort_by)._to_internal()
Expand Down
27 changes: 27 additions & 0 deletions src/neptune_query/internal/composition/attribute_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
split,
util,
)
from . import attributes


def fetch_attribute_definitions_split(
Expand Down Expand Up @@ -163,3 +164,29 @@ def fetch_attribute_values_split(
downstream=downstream,
),
)


def fetch_attribute_values_by_filter_split(
client: AuthenticatedClient,
project_identifier: identifiers.ProjectIdentifier,
executor: Executor,
fetch_attribute_definitions_executor: Executor,
sys_ids: list[identifiers.SysId],
attribute_filter: filters._BaseAttributeFilter,
downstream: Callable[[util.Page[att_vals.AttributeValue]], concurrency.OUT],
) -> concurrency.OUT:
return concurrency.generate_concurrently(
items=split.split_sys_ids(sys_ids),
executor=executor,
downstream=lambda split: concurrency.generate_concurrently(
items=attributes.fetch_attribute_values(
client=client,
project_identifier=project_identifier,
run_identifiers=[identifiers.RunIdentifier(project_identifier, s) for s in split],
attribute_filter=attribute_filter,
executor=fetch_attribute_definitions_executor,
),
executor=executor,
downstream=downstream,
),
)
54 changes: 53 additions & 1 deletion src/neptune_query/internal/composition/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
)
from ..composition import concurrency
from ..retrieval import attribute_definitions as att_defs
from ..retrieval import attribute_values as att_vals
from ..retrieval import util
from ..retrieval.attribute_filter import split_attribute_filters
from ..retrieval.attribute_types import TYPE_AGGREGATIONS


Expand Down Expand Up @@ -126,7 +128,7 @@ def go_fetch_single(
batch_size=batch_size,
)

filters_ = att_defs.split_attribute_filters(attribute_filter)
filters_ = split_attribute_filters(attribute_filter)

output = concurrency.generate_concurrently(
items=(filter_ for filter_ in filters_),
Expand All @@ -138,3 +140,53 @@ def go_fetch_single(
),
)
yield from concurrency.gather_results(output)


def fetch_attribute_values(
client: AuthenticatedClient,
project_identifier: identifiers.ProjectIdentifier,
run_identifiers: Iterable[identifiers.RunIdentifier],
attribute_filter: filters._BaseAttributeFilter,
executor: Executor,
batch_size: int = env.NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
) -> Generator[util.Page[att_vals.AttributeValue], None, None]:
pages_filters = _fetch_attribute_values(
client, project_identifier, run_identifiers, attribute_filter, batch_size, executor
)

seen_items: set[att_vals.AttributeValue] = set()
for page in pages_filters:
new_items = [item for item in page.items if item not in seen_items]
seen_items.update(new_items)
yield util.Page(items=new_items)


def _fetch_attribute_values(
client: AuthenticatedClient,
project_identifier: identifiers.ProjectIdentifier,
run_identifiers: Iterable[identifiers.RunIdentifier],
attribute_filter: filters._BaseAttributeFilter,
batch_size: int,
executor: Executor,
) -> Generator[util.Page[att_vals.AttributeValue], None, None]:
def go_fetch_single(filter_: filters._AttributeFilter) -> Generator[util.Page[att_vals.AttributeValue], None, None]:
return att_vals.fetch_attribute_values(
client=client,
project_identifier=project_identifier,
run_identifiers=run_identifiers,
attribute_definitions=filter_,
batch_size=batch_size,
)

filters_ = split_attribute_filters(attribute_filter)

output = concurrency.generate_concurrently(
items=(filter_ for filter_ in filters_),
executor=executor,
downstream=lambda filter_: concurrency.generate_concurrently(
items=go_fetch_single(filter_),
executor=executor,
downstream=lambda _page: concurrency.return_value(_page),
),
)
yield from concurrency.gather_results(output)
Loading
Loading