Skip to content

Commit fb299f9

Browse files
committed
feat(py-client): Add Python client implementation
1 parent 5c0e09f commit fb299f9

File tree

6 files changed

+589
-6
lines changed

6 files changed

+589
-6
lines changed

python-objectstore-client/pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ version = "0.1.0"
44
description = "Python client for the Sentry Objectstore service"
55
readme = "README.md"
66
requires-python = ">=3.11.9"
7-
dependencies = []
7+
dependencies = [
8+
"sentry-sdk>=2.42.1",
9+
"urllib3>=2.5.0",
10+
"zstandard>=0.18.0",
11+
]
812

913
[build-system]
1014
requires = ["uv_build==0.9.3"] # must match the version specified in devenv/config.ini
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
from __future__ import annotations
2+
3+
from io import BytesIO
4+
from typing import IO, Literal, NamedTuple, NotRequired, Self, TypedDict, cast
5+
from urllib.parse import urlencode
6+
7+
import sentry_sdk
8+
import urllib3
9+
import zstandard
10+
from urllib3.connectionpool import HTTPConnectionPool
11+
12+
from objectstore_client.metadata import (
13+
HEADER_EXPIRATION,
14+
HEADER_META_PREFIX,
15+
Compression,
16+
ExpirationPolicy,
17+
Metadata,
18+
format_expiration,
19+
)
20+
from objectstore_client.metrics import (
21+
MetricsBackend,
22+
NoOpMetricsBackend,
23+
measure_storage_operation,
24+
)
25+
26+
Permission = Literal["read", "write"]
27+
28+
29+
class Scope(TypedDict):
30+
organization: int
31+
project: NotRequired[int]
32+
33+
34+
class GetResult(NamedTuple):
35+
metadata: Metadata
36+
payload: IO[bytes]
37+
38+
39+
class ClientBuilder:
40+
def __init__(
41+
self,
42+
objectstore_base_url: str,
43+
usecase: str,
44+
options: dict | None = None,
45+
metrics_backend: MetricsBackend | None = None,
46+
propagate_traces: bool = False,
47+
):
48+
self._base_url = objectstore_base_url
49+
self._usecase = usecase
50+
self._default_compression: Compression = "zstd"
51+
self._propagate_traces = propagate_traces
52+
self._metrics_backend = metrics_backend or NoOpMetricsBackend()
53+
_ = options
54+
55+
def _make_client(self, scope: str) -> Client:
56+
pool = urllib3.connectionpool.connection_from_url(self._base_url)
57+
return Client(
58+
pool,
59+
self._default_compression,
60+
self._usecase,
61+
scope,
62+
self._propagate_traces,
63+
self._metrics_backend,
64+
)
65+
66+
def default_compression(self, default_compression: Compression) -> Self:
67+
self._default_compression = default_compression
68+
return self
69+
70+
def for_organization(self, organization_id: int) -> Client:
71+
return self._make_client(f"org.{organization_id}")
72+
73+
def for_project(self, organization_id: int, project_id: int) -> Client:
74+
return self._make_client(f"org.{organization_id}/proj.{project_id}")
75+
76+
77+
class Client:
78+
_default_compression: Compression
79+
80+
def __init__(
81+
self,
82+
pool: HTTPConnectionPool,
83+
default_compression: Compression,
84+
usecase: str,
85+
scope: str,
86+
propagate_traces: bool,
87+
metrics_backend: MetricsBackend,
88+
):
89+
self._pool = pool
90+
self._default_compression = default_compression
91+
self._usecase = usecase
92+
self._scope = scope
93+
self._propagate_traces = propagate_traces
94+
self._metrics_backend = metrics_backend
95+
96+
def _make_headers(self) -> dict[str, str]:
97+
if self._propagate_traces:
98+
return dict(sentry_sdk.get_current_scope().iter_trace_propagation_headers())
99+
return {}
100+
101+
def _make_url(self, id: str | None, full=False) -> str:
102+
base_path = f"/v1/{id}" if id else "/v1/"
103+
qs = urlencode({"usecase": self._usecase, "scope": self._scope})
104+
if full:
105+
return f"http://{self._pool.host}:{self._pool.port}{base_path}?{qs}"
106+
else:
107+
return f"{base_path}?{qs}"
108+
109+
def put(
110+
self,
111+
contents: bytes | IO[bytes],
112+
id: str | None = None,
113+
compression: Compression | Literal["none"] | None = None,
114+
metadata: dict[str, str] | None = None,
115+
expiration_policy: ExpirationPolicy | None = None,
116+
) -> str:
117+
"""
118+
Uploads the given `contents` to blob storage.
119+
120+
If no `id` is provided, one will be automatically generated and returned
121+
from this function.
122+
123+
The client will select the configured `default_compression` if none is given
124+
explicitly.
125+
This can be overridden by explicitly giving a `compression` argument.
126+
Providing `"none"` as the argument will instruct the client to not apply
127+
any compression to this upload, which is useful for uncompressible formats.
128+
"""
129+
headers = self._make_headers()
130+
body = BytesIO(contents) if isinstance(contents, bytes) else contents
131+
original_body: IO[bytes] = body
132+
133+
compression = compression or self._default_compression
134+
if compression == "zstd":
135+
cctx = zstandard.ZstdCompressor()
136+
body = cctx.stream_reader(original_body)
137+
headers["Content-Encoding"] = "zstd"
138+
139+
if expiration_policy:
140+
headers[HEADER_EXPIRATION] = format_expiration(expiration_policy)
141+
142+
if metadata:
143+
for k, v in metadata.items():
144+
headers[f"{HEADER_META_PREFIX}{k}"] = v
145+
146+
with measure_storage_operation(self._metrics_backend, "put", self._usecase) as metric_emitter:
147+
response = self._pool.request(
148+
"PUT",
149+
self._make_url(id),
150+
body=body,
151+
headers=headers,
152+
preload_content=True,
153+
decode_content=True,
154+
)
155+
raise_for_status(response)
156+
res = response.json()
157+
158+
# Must do this after streaming `body` as that's what is responsible
159+
# for advancing the seek position in both streams
160+
metric_emitter.record_uncompressed_size(original_body.tell())
161+
if compression and compression != "none":
162+
metric_emitter.record_compressed_size(body.tell(), compression)
163+
return res["key"]
164+
165+
def get(self, id: str, decompress: bool = True) -> GetResult:
166+
"""
167+
This fetches the blob with the given `id`, returning an `IO` stream that
168+
can be read.
169+
170+
By default, content that was uploaded compressed will be automatically
171+
decompressed, unless `decompress=True` is passed.
172+
"""
173+
174+
headers = self._make_headers()
175+
with measure_storage_operation(self._metrics_backend, "get", self._usecase):
176+
response = self._pool.request(
177+
"GET",
178+
self._make_url(id),
179+
preload_content=False,
180+
decode_content=False,
181+
headers=headers,
182+
)
183+
raise_for_status(response)
184+
# OR: should I use `response.stream()`?
185+
stream = cast(IO[bytes], response)
186+
metadata = Metadata.from_headers(response.headers)
187+
188+
if metadata.compression and decompress:
189+
if metadata.compression != "zstd":
190+
raise NotImplementedError("Transparent decoding of anything but `zstd` is not implemented yet")
191+
192+
metadata.compression = None
193+
dctx = zstandard.ZstdDecompressor()
194+
stream = dctx.stream_reader(stream, read_across_frames=True)
195+
196+
return GetResult(metadata, stream)
197+
198+
def object_url(self, id: str) -> str:
199+
"""
200+
Generates a GET url to the object with the given `id`.
201+
202+
This can then be used by downstream services to fetch the given object.
203+
NOTE however that the service does not strictly follow HTTP semantics,
204+
in particular in relation to `Accept-Encoding`.
205+
"""
206+
return self._make_url(id, full=True)
207+
208+
def delete(self, id: str):
209+
"""
210+
Deletes the blob with the given `id`.
211+
"""
212+
213+
headers = self._make_headers()
214+
with measure_storage_operation(self._metrics_backend, "delete", self._usecase):
215+
response = self._pool.request(
216+
"DELETE",
217+
self._make_url(id),
218+
headers=headers,
219+
)
220+
raise_for_status(response)
221+
222+
223+
class ClientError(Exception):
224+
def __init__(self, message: str, status: int, response: str):
225+
super().__init__(message)
226+
self.status = status
227+
self.response = response
228+
229+
230+
def raise_for_status(response: urllib3.BaseHTTPResponse):
231+
if response.status >= 400:
232+
res = str(response.data or response.read())
233+
raise ClientError(
234+
f"Objectstore request failed with status {response.status}",
235+
response.status,
236+
res,
237+
)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from __future__ import annotations
2+
3+
import itertools
4+
import re
5+
from collections.abc import Mapping
6+
from dataclasses import dataclass
7+
from datetime import timedelta
8+
from typing import Literal, cast
9+
10+
Compression = Literal["zstd"]
11+
12+
HEADER_EXPIRATION = "x-sn-expiration"
13+
HEADER_META_PREFIX = "x-snme-"
14+
15+
16+
@dataclass
17+
class TimeToIdle:
18+
delta: timedelta
19+
20+
21+
@dataclass
22+
class TimeToLive:
23+
delta: timedelta
24+
25+
26+
ExpirationPolicy = TimeToIdle | TimeToLive
27+
28+
29+
@dataclass
30+
class Metadata:
31+
compression: Compression | None
32+
expiration_policy: ExpirationPolicy | None
33+
custom: dict[str, str]
34+
35+
@classmethod
36+
def from_headers(cls, headers: Mapping[str, str]) -> Metadata:
37+
compression = None
38+
expiration_policy = None
39+
custom_metadata = {}
40+
for k, v in headers.items():
41+
if k == "content-encoding":
42+
compression = cast(Compression | None, v)
43+
elif k == HEADER_EXPIRATION:
44+
expiration_policy = parse_expiration(v)
45+
elif k.startswith(HEADER_META_PREFIX):
46+
custom_metadata[k[len(HEADER_META_PREFIX) :]] = v
47+
return Metadata(compression, expiration_policy, custom_metadata)
48+
49+
50+
def format_expiration(expiration_policy: ExpirationPolicy) -> str:
51+
if isinstance(expiration_policy, TimeToIdle):
52+
return f"tti:{format_timedelta(expiration_policy.delta)}"
53+
elif isinstance(expiration_policy, TimeToLive):
54+
return f"ttl:{format_timedelta(expiration_policy.delta)}"
55+
56+
57+
def parse_expiration(value: str) -> ExpirationPolicy | None:
58+
if value.startswith("tti:"):
59+
return TimeToIdle(parse_timedelta(value[4:]))
60+
elif value.startswith("ttl:"):
61+
return TimeToLive(parse_timedelta(value[4:]))
62+
63+
return None
64+
65+
66+
def format_timedelta(delta: timedelta) -> str:
67+
days = delta.days
68+
output = f"{days} days" if days else ""
69+
if seconds := delta.seconds:
70+
if output:
71+
output += " "
72+
output += f"{seconds} seconds"
73+
74+
return output
75+
76+
77+
TIME_SPLIT = re.compile(r"[^\W\d_]+|\d+")
78+
79+
80+
def parse_timedelta(delta: str) -> timedelta:
81+
words = TIME_SPLIT.findall(delta)
82+
seconds = 0
83+
84+
for num, unit in itertools.batched(words, n=2, strict=True):
85+
num = int(num)
86+
multiplier = 0
87+
88+
if unit.startswith("w"):
89+
multiplier = 86400 * 7
90+
elif unit.startswith("d"):
91+
multiplier = 86400
92+
elif unit.startswith("h"):
93+
multiplier = 3600
94+
elif unit.startswith("m") and not unit.startswith("ms"):
95+
multiplier = 60
96+
elif unit.startswith("s"):
97+
multiplier = 1
98+
99+
seconds += num * multiplier
100+
101+
return timedelta(seconds=seconds)

0 commit comments

Comments
 (0)