Skip to content

Commit b8d5da4

Browse files
authored
PY-171 Enrich and enable sending the query context to log_query (#158)
* PY-171 Enrich and enable sending the query context to log_query Add the following to the `X-Neptune-Client-Metadata` header sent in all HTTP requests sent to Neptune: * unique `nq_query_id` to the sent data. (each `nq.*` function called gets a new `nq_query_id`) * custom metadata read from `NEPTUNE_QUERY_METADATA` env (if set) Enable the mechanism: * set `ADD_QUERY_METADATA = True` in `neptune_query.internal.query_metadata_context` * Fix unit test * Code review notes addresses 1. Update docs in `env.py` 2. Update the `log.debug` message for when `NEPTUNE_QUERY_METADATA` is too long 3. Switch to generating the random nq_query_id using `secrets` rather than `random` 4. Add the test files that I missed in the previous commit * Remove test_query_metadata_query_id * Stop sending `x-neptune-client-metadata` to `neptune_api.api.storage.signed_url_generic`
1 parent 1b7c281 commit b8d5da4

File tree

5 files changed

+295
-35
lines changed

5 files changed

+295
-35
lines changed

src/neptune_query/internal/env.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def _map_logging_level(value: str) -> str:
8989
NEPTUNE_API_TOKEN = EnvVariable[str]("NEPTUNE_API_TOKEN", _map_str)
9090
NEPTUNE_ENABLE_COLORS = EnvVariable[bool]("NEPTUNE_ENABLE_COLORS", _map_bool, True)
9191
NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS = EnvVariable[int]("NEPTUNE_HTTP_REQUEST_TIMEOUT_SECONDS", int, 60)
92+
NEPTUNE_LOGGER_LEVEL = EnvVariable[str]("NEPTUNE_LOGGER_LEVEL", _map_logging_level, "WARN")
9293
NEPTUNE_PROJECT = EnvVariable[str]("NEPTUNE_PROJECT", _map_str)
9394
NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE = EnvVariable[int](
9495
"NEPTUNE_QUERY_ATTRIBUTE_DEFINITIONS_BATCH_SIZE", int, 10_000
@@ -97,11 +98,15 @@ def _map_logging_level(value: str) -> str:
9798
NEPTUNE_QUERY_FILES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_FILES_BATCH_SIZE", int, 200)
9899
NEPTUNE_QUERY_FILES_MAX_CONCURRENCY = EnvVariable[int]("NEPTUNE_QUERY_FILES_MAX_CONCURRENCY", int, 1)
99100
NEPTUNE_QUERY_FILES_TIMEOUT = EnvVariable[Optional[int]]("NEPTUNE_QUERY_FILES_TIMEOUT", _lift_optional(int), None)
100-
NEPTUNE_QUERY_MAX_REQUEST_SIZE = EnvVariable[int]("NEPTUNE_QUERY_MAX_REQUEST_SIZE", int, 220_000)
101101
NEPTUNE_QUERY_MAX_ATTRIBUTE_FILTER_SIZE = EnvVariable[int](
102102
"NEPTUNE_QUERY_MAX_ATTRIBUTE_FILTER_SIZE", int, 60_000
103103
) # 120_000 bytes seems to be the hard limit, but let's be safe
104+
NEPTUNE_QUERY_MAX_REQUEST_SIZE = EnvVariable[int]("NEPTUNE_QUERY_MAX_REQUEST_SIZE", int, 220_000)
104105
NEPTUNE_QUERY_MAX_WORKERS = EnvVariable[int]("NEPTUNE_QUERY_MAX_WORKERS", int, 32)
106+
# Metadata to be attached to each query for tracking purposes (e.g. user info, experiment context, etc.).
107+
# Accepts a plain string or JSON-encoded string. After we JSON-encode the value (using Python's json.dumps),
108+
# it must fit in 82 bytes. Otherwise, the client sends a placeholder string instead.
109+
NEPTUNE_QUERY_METADATA = EnvVariable[str]("NEPTUNE_QUERY_METADATA", _map_str, "")
105110
NEPTUNE_QUERY_RETRY_HARD_TIMEOUT = EnvVariable[Optional[int]](
106111
"NEPTUNE_QUERY_RETRY_HARD_TIMEOUT", _lift_optional(int), 3600
107112
)
@@ -111,4 +116,3 @@ def _map_logging_level(value: str) -> str:
111116
NEPTUNE_QUERY_SERIES_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_SERIES_BATCH_SIZE", int, 10_000)
112117
NEPTUNE_QUERY_SYS_ATTRS_BATCH_SIZE = EnvVariable[int]("NEPTUNE_QUERY_EXPERIMENT_SYS_ATTRS_BATCH_SIZE", int, 10_000)
113118
NEPTUNE_VERIFY_SSL = EnvVariable[bool]("NEPTUNE_VERIFY_SSL", _map_bool, True)
114-
NEPTUNE_LOGGER_LEVEL = EnvVariable[str]("NEPTUNE_LOGGER_LEVEL", _map_logging_level, "WARN")

src/neptune_query/internal/query_metadata_context.py

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
#
1616
from __future__ import annotations
1717

18+
import base64
1819
import contextlib
19-
import dataclasses
2020
import functools
2121
import json
22-
from dataclasses import dataclass
22+
import logging
23+
import secrets
2324
from importlib.metadata import (
2425
PackageNotFoundError,
2526
version,
@@ -34,38 +35,56 @@
3435

3536
from neptune_api.types import Response
3637

38+
from neptune_query.internal import env
3739
from neptune_query.internal.composition import concurrency
3840

3941
# This flag is used to control whether query metadata should be added to the request headers
40-
# Can remove this after backend supports this properly (under ticket PY-171)
41-
ADD_QUERY_METADATA = False
42+
ADD_QUERY_METADATA = True
4243

43-
44-
@functools.cache
45-
def get_client_version() -> str:
46-
package_name = "neptune-query"
47-
try:
48-
package_version = version(package_name)
49-
except PackageNotFoundError:
50-
package_version = "unknown"
51-
return f"{package_name}/{package_version}"
44+
logger = logging.getLogger(__name__)
5245

5346

54-
@dataclass
5547
class QueryMetadata:
56-
api_function: str
57-
client_version: str
58-
59-
def __post_init__(self) -> None:
60-
self.api_function = self.api_function[:50]
61-
self.client_version = self.client_version[:50]
48+
def __init__(
49+
self,
50+
api_function: str,
51+
client_version: str,
52+
nq_query_id: str,
53+
user_data: str | None = None,
54+
) -> None:
55+
56+
# longest observed: "fetch_experiments_table_global" - 30 characters
57+
self.api_function = api_function[:32]
58+
59+
# longest observed: "nq/1.8.0b1.post8+cea1d73" - 24 characters
60+
self.client_version = client_version[:24]
61+
62+
# nq_query_id should be exactly 8 characters
63+
self.nq_query_id = nq_query_id[:8]
64+
65+
# user_data may contribute at most 80 characters when JSON-encoded
66+
self.user_data: str | dict | None = _process_user_data(user_data)
67+
68+
def to_json(self) -> str:
69+
return json.dumps(
70+
{
71+
"fn": self.api_function,
72+
"v": self.client_version,
73+
"qid": self.nq_query_id,
74+
"ud": self.user_data,
75+
}
76+
)
6277

6378

6479
@contextlib.contextmanager
6580
def use_query_metadata(api_function: str) -> Generator[None, None, None]:
81+
user_data = env.NEPTUNE_QUERY_METADATA.get()
82+
6683
query_metadata = QueryMetadata(
6784
api_function=api_function,
68-
client_version=get_client_version(),
85+
client_version=_get_client_version(),
86+
nq_query_id=_generate_nq_query_id(),
87+
user_data=user_data if user_data else None,
6988
)
7089
with concurrency.use_thread_local({"query_metadata": query_metadata}):
7190
yield
@@ -82,7 +101,47 @@ def wrapper(*args: T.args, **kwargs: T.kwargs) -> Response[R]:
82101
"query_metadata", expected_type=QueryMetadata
83102
)
84103
if ADD_QUERY_METADATA and query_metadata:
85-
kwargs["x_neptune_client_metadata"] = json.dumps(dataclasses.asdict(query_metadata))
104+
kwargs["x_neptune_client_metadata"] = query_metadata.to_json()
86105
return func(*args, **kwargs)
87106

88107
return wrapper
108+
109+
110+
@functools.cache
111+
def _get_client_version() -> str:
112+
try:
113+
package_version = version("neptune-query")
114+
except PackageNotFoundError:
115+
package_version = "unknown"
116+
return f"nq/{package_version}"
117+
118+
119+
@functools.cache
120+
def _process_user_data(user_data: str | None) -> str | dict | None:
121+
if not user_data:
122+
return None
123+
124+
try:
125+
user_data = json.loads(user_data)
126+
user_data_is_proper_json = True
127+
except json.JSONDecodeError:
128+
user_data_is_proper_json = False
129+
130+
if len(json.dumps(user_data)) > 82:
131+
user_data = "NEPTUNE_QUERY_METADATA too long"
132+
if user_data_is_proper_json:
133+
logger.debug("User data in NEPTUNE_QUERY_METADATA env too long. Skipping user data in query metadata.")
134+
else:
135+
logger.debug(
136+
"User data in NEPTUNE_QUERY_METADATA env too long (and not JSON-encoded). "
137+
"Skipping user data in query metadata."
138+
)
139+
140+
return user_data
141+
142+
143+
def _generate_nq_query_id() -> str:
144+
# Generate a random string of exactly 8 URL-safe base64 characters (a-z, A-Z, 0-9, -, _):
145+
nq_query_id_bytes = secrets.token_bytes(6)
146+
nq_query_id = base64.urlsafe_b64encode(nq_query_id_bytes).decode("ascii")
147+
return nq_query_id

src/neptune_query/internal/retrieval/files.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
Provider,
3636
)
3737

38-
from neptune_query.internal.query_metadata_context import with_neptune_client_metadata
39-
4038
from ...exceptions import NeptuneFileDownloadError
4139
from ...types import File
4240
from .. import (
@@ -72,7 +70,8 @@ def fetch_signed_urls(
7270

7371
logger.debug(f"Calling signed_url_generic with body: {body}")
7472

75-
call_api = retry.handle_errors_default(with_neptune_client_metadata(signed_url_generic.sync_detailed))
73+
# skip with_neptune_client_metadata - storage bridge API doesn't support it
74+
call_api = retry.handle_errors_default(signed_url_generic.sync_detailed)
7675
response = call_api(client=client, body=body)
7776

7877
logger.debug(

tests/unit/internal/test_query_metadata_context.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from __future__ import annotations
1717

1818
import json
19+
from base64 import b64decode
1920
from unittest.mock import Mock
2021

2122
import mock
@@ -30,14 +31,18 @@
3031

3132
def test_query_metadata_truncation() -> None:
3233
# given
33-
long_string = "a" * 100
34-
metadata = QueryMetadata(api_function=long_string, client_version=long_string)
34+
metadata = QueryMetadata(
35+
api_function="a" * 100,
36+
client_version="b" * 100,
37+
nq_query_id="c" * 100,
38+
user_data=None,
39+
)
3540

3641
# then
37-
assert len(metadata.api_function) == 50
38-
assert len(metadata.client_version) == 50
39-
assert metadata.api_function == "a" * 50
40-
assert metadata.client_version == "a" * 50
42+
assert len(metadata.to_json()) < 200
43+
assert metadata.api_function == "a" * 32
44+
assert metadata.client_version == "b" * 24
45+
assert metadata.nq_query_id == "c" * 8
4146

4247

4348
def test_use_query_metadata() -> None:
@@ -74,7 +79,15 @@ def test_with_neptune_client_metadata_with_context() -> None:
7479
# when
7580
with (
7681
mock.patch("neptune_query.internal.query_metadata_context.ADD_QUERY_METADATA", True),
77-
mock.patch("neptune_query.internal.query_metadata_context.get_client_version", return_value="1.2.3"),
82+
mock.patch("neptune_query.internal.query_metadata_context._get_client_version", return_value="1.2.3"),
83+
mock.patch(
84+
"neptune_query.internal.query_metadata_context.secrets.token_bytes",
85+
return_value=b64decode("abcd1234"),
86+
),
87+
mock.patch(
88+
"neptune_query.internal.query_metadata_context.env.NEPTUNE_QUERY_METADATA.get",
89+
return_value='{"magic_number": 42, "names": ["John", "Larry"]}',
90+
),
7891
):
7992
decorated_call = with_neptune_client_metadata(mock_api_call)
8093
with use_query_metadata(api_function="test_api"):
@@ -84,5 +97,7 @@ def test_with_neptune_client_metadata_with_context() -> None:
8497
mock_api_call.assert_called_once()
8598
_, kwargs = mock_api_call.call_args
8699
assert "x_neptune_client_metadata" in kwargs
87-
expected_json = json.dumps({"api_function": "test_api", "client_version": "1.2.3"})
100+
expected_json = json.dumps(
101+
{"fn": "test_api", "v": "1.2.3", "qid": "abcd1234", "ud": {"magic_number": 42, "names": ["John", "Larry"]}}
102+
)
88103
assert kwargs["x_neptune_client_metadata"] == expected_json

0 commit comments

Comments
 (0)