Skip to content

Commit 1f8c008

Browse files
k-fishsentrivana
andauthored
feat(metrics): Add trace metrics behind an experiments flag (#4898)
### Summary Similar to getsentry/sentry-javascript#17883, this allows the py sdk to send in new trace metric protocol items, although this code is experimental since the schema may still change. Most of this code has been copied from logs (eg. log batcher -> metrics batcher) in order to dogfood, once we're more sure of our approach we can refactor. Closes LOGS-367 --------- Co-authored-by: Ivana Kellyer <[email protected]>
1 parent a049747 commit 1f8c008

File tree

9 files changed

+576
-2
lines changed

9 files changed

+576
-2
lines changed

sentry_sdk/_metrics.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""
2+
NOTE: This file contains experimental code that may be changed or removed at any
3+
time without prior notice.
4+
"""
5+
6+
import time
7+
from typing import Any, Optional, TYPE_CHECKING, Union
8+
9+
import sentry_sdk
10+
from sentry_sdk.utils import safe_repr
11+
12+
if TYPE_CHECKING:
13+
from sentry_sdk._types import Metric, MetricType
14+
15+
16+
def _capture_metric(
17+
name, # type: str
18+
metric_type, # type: MetricType
19+
value, # type: float
20+
unit=None, # type: Optional[str]
21+
attributes=None, # type: Optional[dict[str, Any]]
22+
):
23+
# type: (...) -> None
24+
client = sentry_sdk.get_client()
25+
26+
attrs = {} # type: dict[str, Union[str, bool, float, int]]
27+
if attributes:
28+
for k, v in attributes.items():
29+
attrs[k] = (
30+
v
31+
if (
32+
isinstance(v, str)
33+
or isinstance(v, int)
34+
or isinstance(v, bool)
35+
or isinstance(v, float)
36+
)
37+
else safe_repr(v)
38+
)
39+
40+
metric = {
41+
"timestamp": time.time(),
42+
"trace_id": None,
43+
"span_id": None,
44+
"name": name,
45+
"type": metric_type,
46+
"value": float(value),
47+
"unit": unit,
48+
"attributes": attrs,
49+
} # type: Metric
50+
51+
client._capture_metric(metric)
52+
53+
54+
def count(
55+
name, # type: str
56+
value, # type: float
57+
unit=None, # type: Optional[str]
58+
attributes=None, # type: Optional[dict[str, Any]]
59+
):
60+
# type: (...) -> None
61+
_capture_metric(name, "counter", value, unit, attributes)
62+
63+
64+
def gauge(
65+
name, # type: str
66+
value, # type: float
67+
unit=None, # type: Optional[str]
68+
attributes=None, # type: Optional[dict[str, Any]]
69+
):
70+
# type: (...) -> None
71+
_capture_metric(name, "gauge", value, unit, attributes)
72+
73+
74+
def distribution(
75+
name, # type: str
76+
value, # type: float
77+
unit=None, # type: Optional[str]
78+
attributes=None, # type: Optional[dict[str, Any]]
79+
):
80+
# type: (...) -> None
81+
_capture_metric(name, "distribution", value, unit, attributes)

sentry_sdk/_metrics_batcher.py

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import os
2+
import random
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import Optional, List, Callable, TYPE_CHECKING, Any, Union
6+
7+
from sentry_sdk.utils import format_timestamp, safe_repr
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
10+
if TYPE_CHECKING:
11+
from sentry_sdk._types import Metric
12+
13+
14+
class MetricsBatcher:
15+
MAX_METRICS_BEFORE_FLUSH = 100
16+
FLUSH_WAIT_TIME = 5.0
17+
18+
def __init__(
19+
self,
20+
capture_func, # type: Callable[[Envelope], None]
21+
):
22+
# type: (...) -> None
23+
self._metric_buffer = [] # type: List[Metric]
24+
self._capture_func = capture_func
25+
self._running = True
26+
self._lock = threading.Lock()
27+
28+
self._flush_event = threading.Event() # type: threading.Event
29+
30+
self._flusher = None # type: Optional[threading.Thread]
31+
self._flusher_pid = None # type: Optional[int]
32+
33+
def _ensure_thread(self):
34+
# type: (...) -> bool
35+
if not self._running:
36+
return False
37+
38+
pid = os.getpid()
39+
if self._flusher_pid == pid:
40+
return True
41+
42+
with self._lock:
43+
if self._flusher_pid == pid:
44+
return True
45+
46+
self._flusher_pid = pid
47+
48+
self._flusher = threading.Thread(target=self._flush_loop)
49+
self._flusher.daemon = True
50+
51+
try:
52+
self._flusher.start()
53+
except RuntimeError:
54+
self._running = False
55+
return False
56+
57+
return True
58+
59+
def _flush_loop(self):
60+
# type: (...) -> None
61+
while self._running:
62+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
63+
self._flush_event.clear()
64+
self._flush()
65+
66+
def add(
67+
self,
68+
metric, # type: Metric
69+
):
70+
# type: (...) -> None
71+
if not self._ensure_thread() or self._flusher is None:
72+
return None
73+
74+
with self._lock:
75+
self._metric_buffer.append(metric)
76+
if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_FLUSH:
77+
self._flush_event.set()
78+
79+
def kill(self):
80+
# type: (...) -> None
81+
if self._flusher is None:
82+
return
83+
84+
self._running = False
85+
self._flush_event.set()
86+
self._flusher = None
87+
88+
def flush(self):
89+
# type: (...) -> None
90+
self._flush()
91+
92+
@staticmethod
93+
def _metric_to_transport_format(metric):
94+
# type: (Metric) -> Any
95+
def format_attribute(val):
96+
# type: (Union[int, float, str, bool]) -> Any
97+
if isinstance(val, bool):
98+
return {"value": val, "type": "boolean"}
99+
if isinstance(val, int):
100+
return {"value": val, "type": "integer"}
101+
if isinstance(val, float):
102+
return {"value": val, "type": "double"}
103+
if isinstance(val, str):
104+
return {"value": val, "type": "string"}
105+
return {"value": safe_repr(val), "type": "string"}
106+
107+
res = {
108+
"timestamp": metric["timestamp"],
109+
"trace_id": metric["trace_id"],
110+
"name": metric["name"],
111+
"type": metric["type"],
112+
"value": metric["value"],
113+
"attributes": {
114+
k: format_attribute(v) for (k, v) in metric["attributes"].items()
115+
},
116+
}
117+
118+
if metric.get("span_id") is not None:
119+
res["span_id"] = metric["span_id"]
120+
121+
if metric.get("unit") is not None:
122+
res["unit"] = metric["unit"]
123+
124+
return res
125+
126+
def _flush(self):
127+
# type: (...) -> Optional[Envelope]
128+
129+
envelope = Envelope(
130+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
131+
)
132+
with self._lock:
133+
if len(self._metric_buffer) == 0:
134+
return None
135+
136+
envelope.add_item(
137+
Item(
138+
type="trace_metric",
139+
content_type="application/vnd.sentry.items.trace-metric+json",
140+
headers={
141+
"item_count": len(self._metric_buffer),
142+
},
143+
payload=PayloadRef(
144+
json={
145+
"items": [
146+
self._metric_to_transport_format(metric)
147+
for metric in self._metric_buffer
148+
]
149+
}
150+
),
151+
)
152+
)
153+
self._metric_buffer.clear()
154+
155+
self._capture_func(envelope)
156+
return envelope

sentry_sdk/_types.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,32 @@ class SDKInfo(TypedDict):
234234
},
235235
)
236236

237+
MetricType = Literal["counter", "gauge", "distribution"]
238+
239+
MetricAttributeValue = TypedDict(
240+
"MetricAttributeValue",
241+
{
242+
"value": Union[str, bool, float, int],
243+
"type": Literal["string", "boolean", "double", "integer"],
244+
},
245+
)
246+
247+
Metric = TypedDict(
248+
"Metric",
249+
{
250+
"timestamp": float,
251+
"trace_id": Optional[str],
252+
"span_id": Optional[str],
253+
"name": str,
254+
"type": MetricType,
255+
"value": float,
256+
"unit": Optional[str],
257+
"attributes": dict[str, str | bool | float | int],
258+
},
259+
)
260+
261+
MetricProcessor = Callable[[Metric, Hint], Optional[Metric]]
262+
237263
# TODO: Make a proper type definition for this (PRs welcome!)
238264
Breadcrumb = Dict[str, Any]
239265

@@ -268,6 +294,7 @@ class SDKInfo(TypedDict):
268294
"monitor",
269295
"span",
270296
"log_item",
297+
"trace_metric",
271298
]
272299
SessionStatus = Literal["ok", "exited", "crashed", "abnormal"]
273300

0 commit comments

Comments
 (0)