Skip to content

Commit 0263a8f

Browse files
committed
PY-175 Rename envs from NEPTUNE_FETCHER_* to NEPTUNE_QUERY_* (#412)
* Rename `NEPTUNE_FETCHER_QUERY_SIZE_LIMIT` env to `NEPTUNE_FETCHER_MAX_REQUEST_SIZE` * PY-175 Rename envs from NEPTUNE_FETCHER_* to NEPTUNE_QUERY_* * Sort env variables * Bump default NEPTUNE_QUERY_MAX_WORKERS to 32 (from 10) (#413) The workers don't do much CPU work, but rather wait for network IO and 32 was shown to improve performance of fetching files. Other than a slight increase in the memory usage, there's no known downside to increasing this number to 32.
1 parent 7a9447b commit 0263a8f

File tree

11 files changed

+63
-66
lines changed

11 files changed

+63
-66
lines changed

docs/old/api_reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ Improves the performance of access to consecutive field values. Works only for s
424424
To speed up the fetching process, this method can use multithreading.
425425
To enable it, set the `use_threads` parameter to `True`.
426426

427-
By default, the maximum number of workers is 10. You can change this number by setting the `NEPTUNE_FETCHER_MAX_WORKERS`
427+
By default, the maximum number of workers is 32. You can change this number by setting the `NEPTUNE_QUERY_MAX_WORKERS`
428428
environment variable.
429429

430430
__Parameters__:

src/neptune_query/internal/composition/attributes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def fetch_attribute_definitions(
4747
run_identifiers: Optional[Iterable[identifiers.RunIdentifier]],
4848
attribute_filter: filters._BaseAttributeFilter,
4949
executor: Executor,
50-
batch_size: int = env.NEPTUNE_FETCHER_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
50+
batch_size: int = env.NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
5151
) -> Generator[util.Page[identifiers.AttributeDefinition], None, None]:
5252
pages_filters = _fetch_attribute_definitions(
5353
client, project_identifiers, run_identifiers, attribute_filter, batch_size, executor
@@ -66,7 +66,7 @@ def fetch_attribute_definition_aggregations(
6666
run_identifiers: Iterable[identifiers.RunIdentifier],
6767
attribute_filter: filters._BaseAttributeFilter,
6868
executor: Executor,
69-
batch_size: int = env.NEPTUNE_FETCHER_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
69+
batch_size: int = env.NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
7070
) -> Generator[
7171
tuple[util.Page[identifiers.AttributeDefinition], util.Page[AttributeDefinitionAggregation]], None, None
7272
]:

src/neptune_query/internal/composition/concurrency.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444

4545
def create_thread_pool_executor() -> Executor:
46-
max_workers = env.NEPTUNE_FETCHER_MAX_WORKERS.get()
46+
max_workers = env.NEPTUNE_QUERY_MAX_WORKERS.get()
4747
return ThreadPoolExecutor(max_workers=max_workers)
4848

4949

src/neptune_query/internal/env.py

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,21 @@
2222
)
2323

2424
__all__ = (
25-
"NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS",
2625
"NEPTUNE_API_TOKEN",
27-
"NEPTUNE_FETCHER_MAX_WORKERS",
26+
"NEPTUNE_ENABLE_COLORS",
27+
"NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS",
2828
"NEPTUNE_PROJECT",
29+
"NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE",
30+
"NEPTUNE_QUERY_ATTRIBUTE_VALUES_BATCH_SIZE",
31+
"NEPTUNE_QUERY_FILES_MAX_CONCURRENCY",
32+
"NEPTUNE_QUERY_FILES_TIMEOUT",
33+
"NEPTUNE_QUERY_MAX_REQUEST_SIZE",
34+
"NEPTUNE_QUERY_MAX_WORKERS",
35+
"NEPTUNE_QUERY_RETRY_HARD_TIMEOUT",
36+
"NEPTUNE_QUERY_RETRY_SOFT_TIMEOUT",
37+
"NEPTUNE_QUERY_SERIES_BATCH_SIZE",
38+
"NEPTUNE_QUERY_SYS_ATTRS_BATCH_SIZE",
2939
"NEPTUNE_VERIFY_SSL",
30-
"NEPTUNE_FETCHER_RETRY_SOFT_TIMEOUT",
31-
"NEPTUNE_FETCHER_RETRY_HARD_TIMEOUT",
32-
"NEPTUNE_FETCHER_SYS_ATTRS_BATCH_SIZE",
33-
"NEPTUNE_FETCHER_ATTRIBUTE_DEFINITIONS_BATCH_SIZE",
34-
"NEPTUNE_FETCHER_ATTRIBUTE_VALUES_BATCH_SIZE",
35-
"NEPTUNE_FETCHER_SERIES_BATCH_SIZE",
36-
"NEPTUNE_FETCHER_QUERY_SIZE_LIMIT",
37-
"NEPTUNE_FETCHER_FILES_MAX_CONCURRENCY",
38-
"NEPTUNE_FETCHER_FILES_TIMEOUT",
39-
"NEPTUNE_ENABLE_COLORS",
4040
)
4141

4242
T = TypeVar("T")
@@ -79,28 +79,25 @@ def wrapped(value: str) -> Optional[T]:
7979
return wrapped
8080

8181

82-
NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS = EnvVariable[int]("NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS", int, 60)
8382
NEPTUNE_API_TOKEN = EnvVariable[str]("NEPTUNE_API_TOKEN", _map_str)
83+
NEPTUNE_ENABLE_COLORS = EnvVariable[bool]("NEPTUNE_ENABLE_COLORS", _map_bool, True)
84+
NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS = EnvVariable[int]("NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS", int, 60)
8485
NEPTUNE_PROJECT = EnvVariable[str]("NEPTUNE_PROJECT", _map_str)
85-
NEPTUNE_VERIFY_SSL = EnvVariable[bool]("NEPTUNE_VERIFY_SSL", _map_bool, True)
86-
NEPTUNE_FETCHER_RETRY_SOFT_TIMEOUT = EnvVariable[Optional[int]](
87-
"NEPTUNE_FETCHER_RETRY_SOFT_TIMEOUT", _lift_optional(int), 300
88-
)
89-
NEPTUNE_FETCHER_RETRY_HARD_TIMEOUT = EnvVariable[Optional[int]](
90-
"NEPTUNE_FETCHER_RETRY_HARD_TIMEOUT", _lift_optional(int), 600
86+
NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE = EnvVariable[int](
87+
"NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE", int, 10_000
9188
)
92-
NEPTUNE_FETCHER_MAX_WORKERS = EnvVariable[int]("NEPTUNE_FETCHER_MAX_WORKERS", int, 10)
93-
NEPTUNE_FETCHER_SYS_ATTRS_BATCH_SIZE = EnvVariable[int]("NEPTUNE_FETCHER_EXPERIMENT_SYS_ATTRS_BATCH_SIZE", int, 10_000)
94-
NEPTUNE_FETCHER_ATTRIBUTE_DEFINITIONS_BATCH_SIZE = EnvVariable[int](
95-
"NEPTUNE_FETCHER_ATTRIBUTE_DEFINITIONS_BATCH_SIZE", int, 10_000
89+
NEPTUNE_QUERY_ATTRIBUTE_VALUES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_ATTRIBUTE_VALUES_BATCH_SIZE", int, 10_000)
90+
NEPTUNE_QUERY_FILES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_FILES_BATCH_SIZE", int, 200)
91+
NEPTUNE_QUERY_FILES_MAX_CONCURRENCY = EnvVariable[int]("NEPTUNE_QUERY_FILES_MAX_CONCURRENCY", int, 1)
92+
NEPTUNE_QUERY_FILES_TIMEOUT = EnvVariable[Optional[int]]("NEPTUNE_QUERY_FILES_TIMEOUT", _lift_optional(int), None)
93+
NEPTUNE_QUERY_MAX_REQUEST_SIZE = EnvVariable[int]("NEPTUNE_QUERY_MAX_REQUEST_SIZE", int, 220_000)
94+
NEPTUNE_QUERY_MAX_WORKERS = EnvVariable[int]("NEPTUNE_QUERY_MAX_WORKERS", int, 32)
95+
NEPTUNE_QUERY_RETRY_HARD_TIMEOUT = EnvVariable[Optional[int]](
96+
"NEPTUNE_QUERY_RETRY_HARD_TIMEOUT", _lift_optional(int), 600
9697
)
97-
NEPTUNE_FETCHER_ATTRIBUTE_VALUES_BATCH_SIZE = EnvVariable[int](
98-
"NEPTUNE_FETCHER_ATTRIBUTE_VALUES_BATCH_SIZE", int, 10_000
98+
NEPTUNE_QUERY_RETRY_SOFT_TIMEOUT = EnvVariable[Optional[int]](
99+
"NEPTUNE_QUERY_RETRY_SOFT_TIMEOUT", _lift_optional(int), 300
99100
)
100-
NEPTUNE_FETCHER_SERIES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_FETCHER_SERIES_BATCH_SIZE", int, 10_000)
101-
NEPTUNE_FETCHER_QUERY_SIZE_LIMIT = EnvVariable[int]("NEPTUNE_FETCHER_QUERY_SIZE_LIMIT", int, 220_000)
102-
NEPTUNE_FETCHER_FILES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_FETCHER_FILES_BATCH_SIZE", int, 200)
103-
NEPTUNE_FETCHER_FILES_MAX_CONCURRENCY = EnvVariable[int]("NEPTUNE_FETCHER_FILES_MAX_CONCURRENCY", int, 1)
104-
NEPTUNE_FETCHER_FILES_TIMEOUT = EnvVariable[Optional[int]]("NEPTUNE_FETCHER_FILES_TIMEOUT", _lift_optional(int), None)
105-
106-
NEPTUNE_ENABLE_COLORS = EnvVariable[bool]("NEPTUNE_ENABLE_COLORS", _map_bool, True)
101+
NEPTUNE_QUERY_SERIES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_SERIES_BATCH_SIZE", int, 10_000)
102+
NEPTUNE_QUERY_SYS_ATTRS_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_EXPERIMENT_SYS_ATTRS_BATCH_SIZE", int, 10_000)
103+
NEPTUNE_VERIFY_SSL = EnvVariable[bool]("NEPTUNE_VERIFY_SSL", _map_bool, True)

src/neptune_query/internal/retrieval/attribute_definitions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def fetch_attribute_definitions_single_filter(
5858
project_identifiers: Iterable[identifiers.ProjectIdentifier],
5959
run_identifiers: Optional[Iterable[identifiers.RunIdentifier]],
6060
attribute_filter: filters._AttributeFilter,
61-
batch_size: int = env.NEPTUNE_FETCHER_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
61+
batch_size: int = env.NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE.get(),
6262
) -> Generator[util.Page[identifiers.AttributeDefinition], None, None]:
6363
params: dict[str, Any] = {
6464
"projectIdentifiers": list(project_identifiers),

src/neptune_query/internal/retrieval/attribute_values.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def fetch_attribute_values(
5454
project_identifier: identifiers.ProjectIdentifier,
5555
run_identifiers: Iterable[identifiers.RunIdentifier],
5656
attribute_definitions: Iterable[identifiers.AttributeDefinition],
57-
batch_size: int = env.NEPTUNE_FETCHER_ATTRIBUTE_VALUES_BATCH_SIZE.get(),
57+
batch_size: int = env.NEPTUNE_QUERY_ATTRIBUTE_VALUES_BATCH_SIZE.get(),
5858
) -> Generator[util.Page[AttributeValue], None, None]:
5959
attribute_definitions_set: set[identifiers.AttributeDefinition] = set(attribute_definitions)
6060
experiments = [str(e) for e in run_identifiers]

src/neptune_query/internal/retrieval/files.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ class DownloadResult:
118118
def download_file(
119119
signed_file: SignedFile,
120120
target_path: pathlib.Path,
121-
max_concurrency: int = env.NEPTUNE_FETCHER_FILES_MAX_CONCURRENCY.get(),
122-
timeout: Optional[int] = env.NEPTUNE_FETCHER_FILES_TIMEOUT.get(),
121+
max_concurrency: int = env.NEPTUNE_QUERY_FILES_MAX_CONCURRENCY.get(),
122+
timeout: Optional[int] = env.NEPTUNE_QUERY_FILES_TIMEOUT.get(),
123123
) -> DownloadResult:
124124
target_path.parent.mkdir(parents=True, exist_ok=True)
125125

@@ -136,8 +136,8 @@ def download_file(
136136
def _download_file_azure(
137137
signed_file: SignedFile,
138138
target_path: pathlib.Path,
139-
max_concurrency: int = env.NEPTUNE_FETCHER_FILES_MAX_CONCURRENCY.get(),
140-
timeout: Optional[int] = env.NEPTUNE_FETCHER_FILES_TIMEOUT.get(),
139+
max_concurrency: int = env.NEPTUNE_QUERY_FILES_MAX_CONCURRENCY.get(),
140+
timeout: Optional[int] = env.NEPTUNE_QUERY_FILES_TIMEOUT.get(),
141141
) -> DownloadResult:
142142
try:
143143
blob_client = AzureBlobClient.from_blob_url(signed_file.url)
@@ -160,7 +160,7 @@ def _download_file_azure(
160160
def _download_file_requests(
161161
signed_file: SignedFile,
162162
target_path: pathlib.Path,
163-
timeout: Optional[int] = env.NEPTUNE_FETCHER_FILES_TIMEOUT.get(),
163+
timeout: Optional[int] = env.NEPTUNE_QUERY_FILES_TIMEOUT.get(),
164164
) -> DownloadResult:
165165
try:
166166
response = requests.get(signed_file.url, stream=True, timeout=timeout)

src/neptune_query/internal/retrieval/retry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
def handle_errors_default(func: Callable[T, Response[R]]) -> Callable[T, Response[R]]:
4545
return retry_backoff(
4646
max_tries=None,
47-
soft_max_time=env.NEPTUNE_FETCHER_RETRY_SOFT_TIMEOUT.get(),
48-
hard_max_time=env.NEPTUNE_FETCHER_RETRY_HARD_TIMEOUT.get(),
47+
soft_max_time=env.NEPTUNE_QUERY_RETRY_SOFT_TIMEOUT.get(),
48+
hard_max_time=env.NEPTUNE_QUERY_RETRY_HARD_TIMEOUT.get(),
4949
backoff_strategy=exponential_backoff(jitter="full"),
5050
)(handle_api_errors(func))
5151

src/neptune_query/internal/retrieval/search.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def __call__(
130130
sort_by: _Attribute = _Attribute("sys/creation_time", type="datetime"),
131131
sort_direction: Literal["asc", "desc"] = "desc",
132132
limit: Optional[int] = None,
133-
batch_size: int = env.NEPTUNE_FETCHER_SYS_ATTRS_BATCH_SIZE.get(),
133+
batch_size: int = env.NEPTUNE_QUERY_SYS_ATTRS_BATCH_SIZE.get(),
134134
container_type: ContainerType = ContainerType.EXPERIMENT,
135135
) -> Generator[util.Page[T], None, None]:
136136
...
@@ -148,7 +148,7 @@ def fetch_sys_attrs(
148148
sort_by: _Attribute = _Attribute("sys/creation_time", type="datetime"),
149149
sort_direction: Literal["asc", "desc"] = "desc",
150150
limit: Optional[int] = None,
151-
batch_size: int = env.NEPTUNE_FETCHER_SYS_ATTRS_BATCH_SIZE.get(),
151+
batch_size: int = env.NEPTUNE_QUERY_SYS_ATTRS_BATCH_SIZE.get(),
152152
container_type: ContainerType = default_container_type,
153153
) -> Generator[util.Page[T], None, None]:
154154
params: dict[str, Any] = {

src/neptune_query/internal/retrieval/split.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ def split_sys_ids(
4949
sys_ids: list[identifiers.SysId],
5050
) -> Generator[list[identifiers.SysId]]:
5151
"""
52-
Splits a sequence of sys ids into batches of size at most `NEPTUNE_FETCHER_QUERY_SIZE_LIMIT`.
52+
Splits a sequence of sys ids into batches of size at most `NEPTUNE_QUERY_MAX_REQUEST_SIZE`.
5353
Use before fetching attribute definitions.
5454
"""
55-
query_size_limit = env.NEPTUNE_FETCHER_QUERY_SIZE_LIMIT.get()
55+
query_size_limit = env.NEPTUNE_QUERY_MAX_REQUEST_SIZE.get()
5656
identifier_num_limit = max(query_size_limit // _sys_id_size(), 1)
5757

5858
identifier_num = len(sys_ids)
@@ -72,14 +72,14 @@ def split_sys_ids_attributes(
7272
) -> Generator[tuple[list[identifiers.SysId], list[identifiers.AttributeDefinition]]]:
7373
"""
7474
Splits a pair of sys ids and attribute_definitions into batches that:
75-
When their length is added it is of size at most `NEPTUNE_FETCHER_QUERY_SIZE_LIMIT`.
76-
When their item count is multiplied, it is at most `NEPTUNE_FETCHER_ATTRIBUTE_VALUES_BATCH_SIZE`.
75+
When their length is added it is of size at most `NEPTUNE_QUERY_MAX_REQUEST_SIZE`.
76+
When their item count is multiplied, it is at most `NEPTUNE_QUERY_ATTRIBUTE_VALUES_BATCH_SIZE`.
7777
7878
It's intended for use before fetching attribute values and assumes that the sys_ids and attribute_definitions
7979
will be sent to the server in a single request and the response will contain data for their cartesian product.
8080
"""
81-
query_size_limit = env.NEPTUNE_FETCHER_QUERY_SIZE_LIMIT.get()
82-
attribute_values_batch_size = env.NEPTUNE_FETCHER_ATTRIBUTE_VALUES_BATCH_SIZE.get()
81+
query_size_limit = env.NEPTUNE_QUERY_MAX_REQUEST_SIZE.get()
82+
attribute_values_batch_size = env.NEPTUNE_QUERY_ATTRIBUTE_VALUES_BATCH_SIZE.get()
8383

8484
if not attribute_definitions:
8585
return
@@ -141,13 +141,13 @@ def _split_attribute_definitions(
141141
def split_series_attributes(items: Iterable[RunAttributeDefinition]) -> Generator[list[RunAttributeDefinition]]:
142142
"""
143143
Splits a list of classes containing an attribute_definition into batches so that:
144-
When the lengths of attribute paths are added, the total length is at most `NEPTUNE_FETCHER_QUERY_SIZE_LIMIT`.
145-
Item count is at most `NEPTUNE_FETCHER_SERIES_BATCH_SIZE`.
144+
When the lengths of attribute paths are added, the total length is at most `NEPTUNE_QUERY_MAX_REQUEST_SIZE`.
145+
Item count is at most `NEPTUNE_QUERY_SERIES_BATCH_SIZE`.
146146
147147
Intended for use before fetching (string, float) series.
148148
"""
149-
query_size_limit = env.NEPTUNE_FETCHER_QUERY_SIZE_LIMIT.get()
150-
batch_size_limit = env.NEPTUNE_FETCHER_SERIES_BATCH_SIZE.get()
149+
query_size_limit = env.NEPTUNE_QUERY_MAX_REQUEST_SIZE.get()
150+
batch_size_limit = env.NEPTUNE_QUERY_SERIES_BATCH_SIZE.get()
151151

152152
if not items:
153153
return
@@ -172,7 +172,7 @@ def _ceil_div(a: int, b: int) -> int:
172172

173173

174174
def split_files(items: list[T]) -> Generator[list[T], None, None]:
175-
batch_size = env.NEPTUNE_FETCHER_FILES_BATCH_SIZE.get()
175+
batch_size = env.NEPTUNE_QUERY_FILES_BATCH_SIZE.get()
176176

177177
for i in range(0, len(items), batch_size):
178178
yield items[i : i + batch_size]

0 commit comments

Comments
 (0)