Skip to content

Commit e315b32

Browse files
committed
wip
1 parent f4007f5 commit e315b32

File tree

1 file changed

+87
-0
lines changed

1 file changed

+87
-0
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#
2+
# Copyright (c) 2025, Neptune Labs Sp. z o.o.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from concurrent.futures import Executor
17+
from dataclasses import dataclass
18+
from typing import (
19+
Generator,
20+
Iterable,
21+
Literal,
22+
Optional,
23+
)
24+
25+
from neptune_api.client import AuthenticatedClient
26+
27+
from .. import (
28+
env,
29+
filters,
30+
identifiers,
31+
)
32+
from ..composition import concurrency
33+
from ..retrieval import attribute_definitions as att_defs
34+
from ..retrieval import util
35+
from ..retrieval.attribute_types import TYPE_AGGREGATIONS
36+
37+
38+
def fetch_run_attribute_definitions(
39+
client: AuthenticatedClient,
40+
project_identifiers: Iterable[identifiers.ProjectIdentifier],
41+
run_identifiers: Optional[Iterable[identifiers.RunIdentifier]],
42+
attribute_filter: filters._BaseAttributeFilter,
43+
executor: Executor,
44+
batch_size: int = env.NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
45+
) -> Generator[util.Page[identifiers.RunAttributeDefinition], None, None]:
46+
pages_filters = _fetch_run_attribute_definitions(
47+
client, project_identifiers, run_identifiers, attribute_filter, batch_size, executor
48+
)
49+
50+
seen_items: set[identifiers.RunAttributeDefinition] = set()
51+
for page, filter_ in pages_filters:
52+
new_items = [item for item in page.items if item not in seen_items]
53+
seen_items.update(new_items)
54+
yield util.Page(items=new_items)
55+
56+
57+
def _fetch_run_attribute_definitions(
58+
client: AuthenticatedClient,
59+
project_identifiers: Iterable[identifiers.ProjectIdentifier],
60+
run_identifiers: Optional[Iterable[identifiers.RunIdentifier]],
61+
attribute_filter: filters._BaseAttributeFilter,
62+
batch_size: int,
63+
executor: Executor,
64+
) -> Generator[util.Page[identifiers.RunAttributeDefinition], None, None]:
65+
def go_fetch_single(
66+
filter_: filters._AttributeFilter,
67+
) -> Generator[util.Page[identifiers.AttributeDefinition], None, None]:
68+
return att_defs.fetch_attribute_definitions_single_filter(
69+
client=client,
70+
project_identifiers=project_identifiers,
71+
run_identifiers=run_identifiers,
72+
attribute_filter=filter_,
73+
batch_size=batch_size,
74+
)
75+
76+
filters_ = att_defs.split_attribute_filters(attribute_filter)
77+
78+
output = concurrency.generate_concurrently(
79+
items=(filter_ for filter_ in filters_),
80+
executor=executor,
81+
downstream=lambda filter_: concurrency.generate_concurrently(
82+
items=go_fetch_single(filter_),
83+
executor=executor,
84+
downstream=lambda _page: concurrency.return_value((_page, filter_)),
85+
),
86+
)
87+
yield from concurrency.gather_results(output)

0 commit comments

Comments
 (0)