Skip to content

Commit 7cfd09e

Browse files
add latency, throughput, and size metrics for get/put/delete storage operations (#97277)
logs: - request latency - compressed and uncompressed sizes - compressed and uncompressed throughput (bytes per second) - compressed and uncompressed _inverse throughput_ (seconds per byte) - compression ratio throughputs and compression ratio require one or both of the size metrics. if they're not known, the metrics that depend on them won't be logged. nuances: - profiling reads are in vroom so i can't instrument them in this PR. didn't see explicit deletes. also i added a missing PUT case - there isn't a good way to get uncompressed size for replays. our dash currently relies on their own metrics from ingestion for that - the nodestore metrics are the layer above where (de)compression is applied, so: - the latencies include (de)compression whereas the other instances of these metrics do not. - while compressed PUT size is still logged with [the one-off log arpad added](https://github.com/getsentry/sentry/blob/06e943abf26a0566339b00a25324b8e0d212cab6/src/sentry/utils/kvstore/bigtable.py#L244-L249), the compressed throughputs and compression ratio are not because the metric emitter can't get to it - it's not currently clear to me the right place to instrument artifact-bundles and debug-files. the PUT size metrics arpad added are still in tact, but i'll have to double back to add latency, throughput, and get/delete metrics ### Legal Boilerplate Look, I get it. The entity doing business as "Sentry" was incorporated in the State of Delaware in 2015 as Functional Software, Inc. and is gonna need some rights from me in order to utilize my contributions in this here PR. So here's the deal: I retain all rights, title and interest in and to my contributions, and by keeping this boilerplate intact I confirm that Sentry can use, modify, copy, and redistribute my contributions, under Sentry's choice of terms. --------- Co-authored-by: Arpad Borsos <[email protected]>
1 parent 690cf4e commit 7cfd09e

File tree

8 files changed

+173
-86
lines changed

8 files changed

+173
-86
lines changed

src/sentry/filestore/gcs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ def __init__(self, name, mode, storage):
175175

176176
@property
177177
def size(self):
178+
if self.blob.size is None:
179+
self.blob.reload()
178180
return self.blob.size
179181

180182
@property

src/sentry/models/eventattachment.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
from sentry.db.models.manager.base_query_set import BaseQuerySet
1919
from sentry.models.files.utils import get_size_and_checksum, get_storage
2020
from sentry.objectstore import attachments
21-
from sentry.objectstore.metrics import measure_storage_put
21+
from sentry.objectstore.metrics import measure_storage_operation
2222
from sentry.options.rollout import in_random_rollout
23-
from sentry.utils import metrics
2423

2524
# Attachment file types that are considered a crash report (PII relevant)
2625
CRASH_REPORT_TYPES = ("event.minidump", "event.applecrashreport")
@@ -121,7 +120,8 @@ def delete(self, *args: Any, **kwargs: Any) -> tuple[int, dict[str, int]]:
121120

122121
elif self.blob_path.startswith(V1_PREFIX):
123122
storage = get_storage()
124-
storage.delete(self.blob_path)
123+
with measure_storage_operation("delete", "attachments"):
124+
storage.delete(self.blob_path)
125125

126126
elif self.blob_path.startswith(V2_PREFIX):
127127
organization_id = _get_organization(self.project_id)
@@ -143,7 +143,13 @@ def getfile(self) -> IO[bytes]:
143143

144144
elif self.blob_path.startswith(V1_PREFIX):
145145
storage = get_storage()
146-
compressed_blob = storage.open(self.blob_path)
146+
with measure_storage_operation("get", "attachments", self.size) as metric_emitter:
147+
compressed_blob = storage.open(self.blob_path)
148+
# We want to log the compressed size here but we want to stream the payload.
149+
# Accessing `.size` does additional metadata requests, for which we
150+
# just swallow the costs.
151+
metric_emitter.record_compressed_size(compressed_blob.size, "zstd")
152+
147153
dctx = zstandard.ZstdDecompressor()
148154
return dctx.stream_reader(compressed_blob, read_across_frames=True)
149155

@@ -168,17 +174,6 @@ def putfile(cls, project_id: int, attachment: CachedAttachment) -> PutfileResult
168174
blob = BytesIO(data)
169175
size, checksum = get_size_and_checksum(blob)
170176

171-
# TODO: we measure the uncompressed size for inline stored attachments as well,
172-
# however moving to V2 storage would mean we would eather double count
173-
# when leaving this metric here in place, or miss inline-stored attachments
174-
# when removing this metric and only rely on the one in the V2 Client API.
175-
metrics.distribution(
176-
"storage.put.size",
177-
size,
178-
tags={"usecase": "attachments", "compression": "none"},
179-
unit="byte",
180-
)
181-
182177
if can_store_inline(data):
183178
blob_path = ":" + data.decode()
184179

@@ -188,7 +183,9 @@ def putfile(cls, project_id: int, attachment: CachedAttachment) -> PutfileResult
188183
storage = get_storage()
189184
compressed_blob = zstandard.compress(data)
190185

191-
with measure_storage_put(len(compressed_blob), "attachments", "zstd"):
186+
with measure_storage_operation(
187+
"put", "attachments", size, len(compressed_blob), "zstd"
188+
):
192189
storage.save(blob_path, BytesIO(compressed_blob))
193190

194191
else:

src/sentry/nodestore/base.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from django.utils.functional import cached_property
1111

1212
from sentry import options
13-
from sentry.objectstore.metrics import measure_storage_put
1413
from sentry.utils import json, metrics
1514
from sentry.utils.services import Service
1615

@@ -232,8 +231,7 @@ def set_bytes(self, item_id: str, data: bytes, ttl: timedelta | None = None) ->
232231
>>> nodestore.set_bytes('key1', b"{'foo': 'bar'}")
233232
"""
234233
metrics.distribution("nodestore.set_bytes", len(data))
235-
with measure_storage_put(len(data), "nodestore"):
236-
return self._set_bytes(item_id, data, ttl)
234+
return self._set_bytes(item_id, data, ttl)
237235

238236
def _set_bytes(self, item_id: str, data: bytes, ttl: timedelta | None = None) -> None:
239237
raise NotImplementedError

src/sentry/nodestore/bigtable/backend.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sentry_sdk
88

99
from sentry.nodestore.base import NodeStorage
10+
from sentry.objectstore.metrics import measure_storage_operation
1011
from sentry.utils.kvstore.bigtable import BigtableKVStorage
1112

1213

@@ -65,24 +66,37 @@ def __init__(
6566

6667
@sentry_sdk.tracing.trace
6768
def _get_bytes(self, id: str) -> bytes | None:
68-
return self.store.get(id)
69+
# Note: This metric encapsulates any decompression performed by `self.store.get()`. Other
70+
# instances of this metric stop measuring before decompression happens.
71+
with measure_storage_operation("get", "nodestore") as metric_emitter:
72+
result = self.store.get(id)
73+
if result:
74+
metric_emitter.record_uncompressed_size(len(result))
75+
return result
6976

7077
@sentry_sdk.tracing.trace
7178
def _get_bytes_multi(self, id_list: list[str]) -> dict[str, bytes | None]:
7279
rv: dict[str, bytes | None] = {id: None for id in id_list}
73-
rv.update(self.store.get_many(id_list))
80+
# Note: This metric encapsulates any decompression performed by `self.store.get_many()`. Other
81+
# instances of this metric stop measuring before decompression happens.
82+
with measure_storage_operation("get-multi", "nodestore"):
83+
rv.update(self.store.get_many(id_list))
7484
return rv
7585

7686
def _set_bytes(self, id: str, data: Any, ttl: timedelta | None = None) -> None:
77-
self.store.set(id, data, ttl)
87+
# Note: This metric encapsulates any compression performed by `self.store.put()`. Other
88+
# instances of this metric start measuring after compression happens.
89+
with measure_storage_operation("put", "nodestore", len(data)):
90+
self.store.set(id, data, ttl)
7891

7992
def delete(self, id: str) -> None:
8093
if self.skip_deletes:
8194
return
8295

8396
with sentry_sdk.start_span(op="nodestore.bigtable.delete"):
8497
try:
85-
self.store.delete(id)
98+
with measure_storage_operation("delete", "nodestore"):
99+
self.store.delete(id)
86100
finally:
87101
self._delete_cache_item(id)
88102

@@ -98,7 +112,8 @@ def delete_multi(self, id_list: list[str]) -> None:
98112
return
99113

100114
try:
101-
self.store.delete_many(id_list)
115+
with measure_storage_operation("delete-multi", "nodestore"):
116+
self.store.delete_many(id_list)
102117
finally:
103118
self._delete_cache_items(id_list)
104119

src/sentry/objectstore/metrics.py

Lines changed: 96 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,111 @@
11
import time
22
from collections.abc import Generator
33
from contextlib import contextmanager
4-
from dataclasses import dataclass
54

65
from sentry.utils import metrics
76

87

9-
@dataclass
10-
class UploadMeasurement:
11-
upload_size: int | None
12-
compression: str | None
8+
class StorageMetricEmitter:
9+
def __init__(self, operation: str, usecase: str):
10+
self.operation = operation
11+
self.usecase = usecase
12+
13+
# These may be set during or after the enclosed operation
14+
self.start: int | None = None
15+
self.elapsed: float | None = None
16+
self.uncompressed_size: int | None = None
17+
self.compressed_size: int | None = None
18+
self.compression: str = "unknown"
19+
20+
def record_latency(self, elapsed: float):
21+
tags = {"usecase": self.usecase}
22+
metrics.timing(f"storage.{self.operation}.latency", elapsed, tags=tags, precise=True)
23+
self.elapsed = elapsed
24+
25+
def record_uncompressed_size(self, value: int):
26+
tags = {"usecase": self.usecase, "compression": "none"}
27+
metrics.distribution(
28+
f"storage.{self.operation}.size", value, tags=tags, unit="byte", precise=True
29+
)
30+
self.uncompressed_size = value
31+
32+
def record_compressed_size(self, value: int, compression: str = "unknown"):
33+
tags = {"usecase": self.usecase, "compression": compression}
34+
metrics.distribution(
35+
f"storage.{self.operation}.size", value, tags=tags, unit="byte", precise=True
36+
)
37+
self.compressed_size = value
38+
self.compression = compression
39+
40+
def maybe_record_compression_ratio(self):
41+
if not self.uncompressed_size or not self.compressed_size:
42+
return
43+
44+
tags = {"usecase": self.usecase, "compression": self.compression}
45+
metrics.distribution(
46+
f"storage.{self.operation}.compression_ratio",
47+
self.compressed_size / self.uncompressed_size,
48+
tags=tags,
49+
precise=True,
50+
)
51+
52+
def maybe_record_throughputs(self):
53+
if not self.elapsed or self.elapsed <= 0:
54+
return
55+
56+
sizes = []
57+
if self.uncompressed_size:
58+
sizes.append((self.uncompressed_size, "none"))
59+
if self.compressed_size:
60+
sizes.append((self.compressed_size, self.compression))
61+
62+
for size, compression in sizes:
63+
tags = {"usecase": self.usecase, "compression": compression}
64+
metrics.distribution(
65+
f"storage.{self.operation}.throughput", size / self.elapsed, tags=tags, precise=True
66+
)
67+
metrics.distribution(
68+
f"storage.{self.operation}.inverse_throughput",
69+
self.elapsed / size,
70+
tags=tags,
71+
precise=True,
72+
)
1373

1474

1575
@contextmanager
16-
def measure_storage_put(
17-
upload_size: int | None, usecase: str, compression: str | None = None
18-
) -> Generator[UploadMeasurement]:
19-
measurement = UploadMeasurement(upload_size, compression)
76+
def measure_storage_operation(
77+
operation: str,
78+
usecase: str,
79+
uncompressed_size: int | None = None,
80+
compressed_size: int | None = None,
81+
compression: str = "unknown",
82+
) -> Generator[StorageMetricEmitter]:
83+
"""
84+
Context manager which records the latency of the enclosed storage operation.
85+
Can also record the compressed or uncompressed size of an object, the
86+
compression ratio, the throughput, and the inverse throughput.
87+
88+
Yields a `StorageMetricEmitter` because for some operations (GET) the size
89+
is not known until the inside of the enclosed block.
90+
"""
91+
emitter = StorageMetricEmitter(operation, usecase)
92+
93+
if uncompressed_size:
94+
emitter.record_uncompressed_size(uncompressed_size)
95+
if compressed_size:
96+
emitter.record_compressed_size(compressed_size, compression)
97+
2098
start = time.monotonic()
99+
100+
# Yield an emitter in case the size becomes known inside the enclosed block
21101
try:
22-
yield measurement
102+
yield emitter
103+
23104
finally:
24105
elapsed = time.monotonic() - start
25-
metrics.timing("storage.put.latency", elapsed, tags={"usecase": usecase})
106+
emitter.record_latency(elapsed)
26107

27-
if upload_size := measurement.upload_size:
28-
metrics.distribution(
29-
"storage.put.size",
30-
upload_size,
31-
tags={"usecase": usecase, "compression": measurement.compression or "none"},
32-
unit="byte",
33-
)
34-
if elapsed > 0:
35-
metrics.distribution(
36-
"storage.put.throughput", upload_size / elapsed, tags={"usecase": usecase}
37-
)
108+
# If `uncompressed_size` and/or `compressed_size` have been set, we have
109+
# extra metrics we can send.
110+
emitter.maybe_record_compression_ratio()
111+
emitter.maybe_record_throughputs()

src/sentry/objectstore/service.py

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
Metadata,
1717
format_expiration,
1818
)
19-
from sentry.objectstore.metrics import measure_storage_put
20-
from sentry.utils import jwt, metrics
19+
from sentry.objectstore.metrics import measure_storage_operation
20+
from sentry.utils import jwt
2121

2222
Permission = Literal["read", "write"]
2323

@@ -130,7 +130,7 @@ def put(
130130
for k, v in metadata.items():
131131
headers[f"{HEADER_META_PREFIX}{k}"] = v
132132

133-
with measure_storage_put(None, self._usecase, compression) as measurement:
133+
with measure_storage_operation("put", self._usecase) as metric_emitter:
134134
response = self._pool.request(
135135
"PUT",
136136
f"/{id}" if id else "/",
@@ -142,15 +142,11 @@ def put(
142142
raise_for_status(response)
143143
res = response.json()
144144

145-
measurement.upload_size = body.tell()
146-
if compression != "none":
147-
metrics.distribution(
148-
"storage.put.size",
149-
original_body.tell(),
150-
tags={"usecase": self._usecase, "compression": "none"},
151-
unit="byte",
152-
)
153-
145+
# Must do this after streaming `body` as that's what is responsible
146+
# for advancing the seek position in both streams
147+
metric_emitter.record_uncompressed_size(original_body.tell())
148+
if compression and compression != "none":
149+
metric_emitter.record_compressed_size(body.tell(), compression)
154150
return res["key"]
155151

156152
def get(self, id: str, decompress: bool = True) -> GetResult:
@@ -163,14 +159,15 @@ def get(self, id: str, decompress: bool = True) -> GetResult:
163159
"""
164160
headers = self._make_headers("read")
165161

166-
response = self._pool.request(
167-
"GET",
168-
f"/{id}",
169-
headers=headers,
170-
preload_content=False,
171-
decode_content=False,
172-
)
173-
raise_for_status(response)
162+
with measure_storage_operation("get", self._usecase):
163+
response = self._pool.request(
164+
"GET",
165+
f"/{id}",
166+
headers=headers,
167+
preload_content=False,
168+
decode_content=False,
169+
)
170+
raise_for_status(response)
174171
# OR: should I use `response.stream()`?
175172
stream = cast(IO[bytes], response)
176173
metadata = Metadata.from_headers(response.headers)
@@ -193,12 +190,13 @@ def delete(self, id: str):
193190
"""
194191
headers = self._make_headers("write")
195192

196-
response = self._pool.request(
197-
"DELETE",
198-
f"/{id}",
199-
headers=headers,
200-
)
201-
raise_for_status(response)
193+
with measure_storage_operation("delete", self._usecase):
194+
response = self._pool.request(
195+
"DELETE",
196+
f"/{id}",
197+
headers=headers,
198+
)
199+
raise_for_status(response)
202200

203201

204202
class ClientError(Exception):

0 commit comments

Comments
 (0)