Skip to content

Commit b156cff

Browse files
committed
feat(metrics): Add experimental trace metrics behind an experiments flag
Similar to getsentry/sentry-javascript#17883, this allows the py sdk to send in new trace metric protocol items, although this code is experimental since the schema may still change. Most of this code has been copied from logs (eg. log batcher -> metrics batcher) in order to dogfood, once we're more sure of our approach we can refactor.
1 parent 55e903e commit b156cff

File tree

9 files changed

+565
-0
lines changed

9 files changed

+565
-0
lines changed

sentry_sdk/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from sentry_sdk import profiler
2+
from sentry_sdk import trace_metrics
23
from sentry_sdk.scope import Scope
34
from sentry_sdk.transport import Transport, HttpTransport
45
from sentry_sdk.client import Client
@@ -49,6 +50,7 @@
4950
"monitor",
5051
"logger",
5152
"profiler",
53+
"trace_metrics",
5254
"start_session",
5355
"end_session",
5456
"set_transaction_name",
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
import os
2+
import random
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import Optional, List, Callable, TYPE_CHECKING, Any
6+
7+
from sentry_sdk.utils import format_timestamp, safe_repr
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
10+
if TYPE_CHECKING:
11+
from sentry_sdk._types import TraceMetric
12+
13+
14+
class TraceMetricsBatcher:
15+
MAX_METRICS_BEFORE_FLUSH = 100
16+
FLUSH_WAIT_TIME = 5.0
17+
18+
def __init__(
19+
self,
20+
capture_func, # type: Callable[[Envelope], None]
21+
):
22+
# type: (...) -> None
23+
self._metric_buffer = [] # type: List[TraceMetric]
24+
self._capture_func = capture_func
25+
self._running = True
26+
self._lock = threading.Lock()
27+
28+
self._flush_event = threading.Event() # type: threading.Event
29+
30+
self._flusher = None # type: Optional[threading.Thread]
31+
self._flusher_pid = None # type: Optional[int]
32+
33+
def _ensure_thread(self):
34+
# type: (...) -> bool
35+
if not self._running:
36+
return False
37+
38+
pid = os.getpid()
39+
if self._flusher_pid == pid:
40+
return True
41+
42+
with self._lock:
43+
if self._flusher_pid == pid:
44+
return True
45+
46+
self._flusher_pid = pid
47+
48+
self._flusher = threading.Thread(target=self._flush_loop)
49+
self._flusher.daemon = True
50+
51+
try:
52+
self._flusher.start()
53+
except RuntimeError:
54+
self._running = False
55+
return False
56+
57+
return True
58+
59+
def _flush_loop(self):
60+
# type: (...) -> None
61+
while self._running:
62+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
63+
self._flush_event.clear()
64+
self._flush()
65+
66+
def add(
67+
self,
68+
metric, # type: TraceMetric
69+
):
70+
# type: (...) -> None
71+
if not self._ensure_thread() or self._flusher is None:
72+
return None
73+
74+
with self._lock:
75+
self._metric_buffer.append(metric)
76+
if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_FLUSH:
77+
self._flush_event.set()
78+
79+
def kill(self):
80+
# type: (...) -> None
81+
if self._flusher is None:
82+
return
83+
84+
self._running = False
85+
self._flush_event.set()
86+
self._flusher = None
87+
88+
def flush(self):
89+
# type: (...) -> None
90+
self._flush()
91+
92+
@staticmethod
93+
def _metric_to_transport_format(metric):
94+
# type: (TraceMetric) -> Any
95+
def format_attribute(val):
96+
# type: (int | float | str | bool) -> Any
97+
if isinstance(val, bool):
98+
return {"value": val, "type": "boolean"}
99+
if isinstance(val, int):
100+
return {"value": val, "type": "integer"}
101+
if isinstance(val, float):
102+
return {"value": val, "type": "double"}
103+
if isinstance(val, str):
104+
return {"value": val, "type": "string"}
105+
return {"value": safe_repr(val), "type": "string"}
106+
107+
res = {
108+
"timestamp": metric["timestamp"],
109+
"trace_id": metric["trace_id"],
110+
"name": metric["name"],
111+
"type": metric["type"],
112+
"value": metric["value"],
113+
"attributes": {
114+
k: format_attribute(v) for (k, v) in metric["attributes"].items()
115+
},
116+
}
117+
118+
if metric.get("span_id") is not None:
119+
res["span_id"] = metric["span_id"]
120+
121+
if metric.get("unit") is not None:
122+
res["unit"] = metric["unit"]
123+
124+
return res
125+
126+
def _flush(self):
127+
# type: (...) -> Optional[Envelope]
128+
129+
envelope = Envelope(
130+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
131+
)
132+
with self._lock:
133+
if len(self._metric_buffer) == 0:
134+
return None
135+
136+
envelope.add_item(
137+
Item(
138+
type="trace_metric",
139+
content_type="application/vnd.sentry.items.trace-metric+json",
140+
headers={
141+
"item_count": len(self._metric_buffer),
142+
},
143+
payload=PayloadRef(
144+
json={
145+
"items": [
146+
self._metric_to_transport_format(metric)
147+
for metric in self._metric_buffer
148+
]
149+
}
150+
),
151+
)
152+
)
153+
self._metric_buffer.clear()
154+
155+
self._capture_func(envelope)
156+
return envelope
157+

sentry_sdk/_types.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,32 @@ class SDKInfo(TypedDict):
235235
},
236236
)
237237

238+
TraceMetricType = Literal["counter", "gauge", "distribution"]
239+
240+
TraceMetricAttributeValue = TypedDict(
241+
"TraceMetricAttributeValue",
242+
{
243+
"value": Union[str, bool, float, int],
244+
"type": Literal["string", "boolean", "double", "integer"],
245+
},
246+
)
247+
248+
TraceMetric = TypedDict(
249+
"TraceMetric",
250+
{
251+
"timestamp": float,
252+
"trace_id": str,
253+
"span_id": Optional[str],
254+
"name": str,
255+
"type": TraceMetricType,
256+
"value": float,
257+
"unit": Optional[str],
258+
"attributes": dict[str, TraceMetricAttributeValue],
259+
},
260+
)
261+
262+
TraceMetricProcessor = Callable[[TraceMetric, Hint], Optional[TraceMetric]]
263+
238264
# TODO: Make a proper type definition for this (PRs welcome!)
239265
Breadcrumb = Dict[str, Any]
240266

@@ -270,6 +296,7 @@ class SDKInfo(TypedDict):
270296
"monitor",
271297
"span",
272298
"log_item",
299+
"trace_metric",
273300
]
274301
SessionStatus = Literal["ok", "exited", "crashed", "abnormal"]
275302

sentry_sdk/client.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
logger,
2626
get_before_send_log,
2727
has_logs_enabled,
28+
has_trace_metrics_enabled,
2829
)
2930
from sentry_sdk.serializer import serialize
3031
from sentry_sdk.tracing import trace
@@ -184,6 +185,7 @@ def __init__(self, options=None):
184185
self.monitor = None # type: Optional[Monitor]
185186
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
186187
self.log_batcher = None # type: Optional[LogBatcher]
188+
self.trace_metrics_batcher = None # type: Optional[TraceMetricsBatcher]
187189

188190
def __getstate__(self, *args, **kwargs):
189191
# type: (*Any, **Any) -> Any
@@ -219,6 +221,10 @@ def _capture_experimental_log(self, log):
219221
# type: (Log) -> None
220222
pass
221223

224+
def _capture_trace_metric(self, metric):
225+
# type: (TraceMetric) -> None
226+
pass
227+
222228
def capture_session(self, *args, **kwargs):
223229
# type: (*Any, **Any) -> None
224230
return None
@@ -388,6 +394,13 @@ def _capture_envelope(envelope):
388394

389395
self.log_batcher = LogBatcher(capture_func=_capture_envelope)
390396

397+
self.trace_metrics_batcher = None
398+
399+
if has_trace_metrics_enabled(self.options):
400+
from sentry_sdk._trace_metrics_batcher import TraceMetricsBatcher
401+
402+
self.trace_metrics_batcher = TraceMetricsBatcher(capture_func=_capture_envelope)
403+
391404
max_request_body_size = ("always", "never", "small", "medium")
392405
if self.options["max_request_body_size"] not in max_request_body_size:
393406
raise ValueError(
@@ -967,6 +980,56 @@ def _capture_experimental_log(self, log):
967980
if self.log_batcher:
968981
self.log_batcher.add(log)
969982

983+
def _capture_trace_metric(self, metric):
984+
# type: (Optional[TraceMetric]) -> None
985+
if not has_trace_metrics_enabled(self.options) or metric is None:
986+
return
987+
988+
current_scope = sentry_sdk.get_current_scope()
989+
isolation_scope = sentry_sdk.get_isolation_scope()
990+
991+
metric["attributes"]["sentry.sdk.name"] = SDK_INFO["name"]
992+
metric["attributes"]["sentry.sdk.version"] = SDK_INFO["version"]
993+
994+
environment = self.options.get("environment")
995+
if environment is not None and "sentry.environment" not in metric["attributes"]:
996+
metric["attributes"]["sentry.environment"] = environment
997+
998+
release = self.options.get("release")
999+
if release is not None and "sentry.release" not in metric["attributes"]:
1000+
metric["attributes"]["sentry.release"] = release
1001+
1002+
if isolation_scope._user is not None:
1003+
for metric_attribute, user_attribute in (
1004+
("user.id", "id"),
1005+
("user.name", "username"),
1006+
("user.email", "email"),
1007+
):
1008+
if (
1009+
user_attribute in isolation_scope._user
1010+
and metric_attribute not in metric["attributes"]
1011+
):
1012+
metric["attributes"][metric_attribute] = isolation_scope._user[
1013+
user_attribute
1014+
]
1015+
1016+
debug = self.options.get("debug", False)
1017+
if debug:
1018+
logger.debug(
1019+
f"[Sentry Trace Metrics] [{metric.get('type')}] {metric.get('name')}: {metric.get('value')}"
1020+
)
1021+
1022+
from sentry_sdk.utils import get_before_send_trace_metric
1023+
before_send_trace_metric = get_before_send_trace_metric(self.options)
1024+
if before_send_trace_metric is not None:
1025+
metric = before_send_trace_metric(metric, {})
1026+
1027+
if metric is None:
1028+
return
1029+
1030+
if self.trace_metrics_batcher:
1031+
self.trace_metrics_batcher.add(metric)
1032+
9701033
def capture_session(
9711034
self,
9721035
session, # type: Session
@@ -1023,6 +1086,8 @@ def close(
10231086
self.metrics_aggregator.kill()
10241087
if self.log_batcher is not None:
10251088
self.log_batcher.kill()
1089+
if self.trace_metrics_batcher is not None:
1090+
self.trace_metrics_batcher.kill()
10261091
if self.monitor:
10271092
self.monitor.kill()
10281093
self.transport.kill()
@@ -1049,6 +1114,8 @@ def flush(
10491114
self.metrics_aggregator.flush()
10501115
if self.log_batcher is not None:
10511116
self.log_batcher.flush()
1117+
if self.trace_metrics_batcher is not None:
1118+
self.trace_metrics_batcher.flush()
10521119
self.transport.flush(timeout=timeout, callback=callback)
10531120

10541121
def __enter__(self):

sentry_sdk/envelope.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ def data_category(self):
285285
return "error"
286286
elif ty == "log":
287287
return "log_item"
288+
elif ty == "trace_metric":
289+
return "trace_metric"
288290
elif ty == "client_report":
289291
return "internal"
290292
elif ty == "profile":

0 commit comments

Comments
 (0)