Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sentry_sdk import profiler
from sentry_sdk import trace_metrics
from sentry_sdk.scope import Scope
from sentry_sdk.transport import Transport, HttpTransport
from sentry_sdk.client import Client
Expand Down Expand Up @@ -49,6 +50,7 @@
"monitor",
"logger",
"profiler",
"trace_metrics",
"start_session",
"end_session",
"set_transaction_name",
Expand Down
157 changes: 157 additions & 0 deletions sentry_sdk/_trace_metrics_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import os
import random
import threading
from datetime import datetime, timezone
from typing import Optional, List, Callable, TYPE_CHECKING, Any

from sentry_sdk.utils import format_timestamp, safe_repr
from sentry_sdk.envelope import Envelope, Item, PayloadRef

if TYPE_CHECKING:
from sentry_sdk._types import TraceMetric


class TraceMetricsBatcher:
MAX_METRICS_BEFORE_FLUSH = 100
FLUSH_WAIT_TIME = 5.0

def __init__(
self,
capture_func, # type: Callable[[Envelope], None]
):
# type: (...) -> None
self._metric_buffer = [] # type: List[TraceMetric]
self._capture_func = capture_func
self._running = True
self._lock = threading.Lock()

self._flush_event = threading.Event() # type: threading.Event

self._flusher = None # type: Optional[threading.Thread]
self._flusher_pid = None # type: Optional[int]

def _ensure_thread(self):
# type: (...) -> bool
if not self._running:
return False

pid = os.getpid()
if self._flusher_pid == pid:
return True

with self._lock:
if self._flusher_pid == pid:
return True

self._flusher_pid = pid

self._flusher = threading.Thread(target=self._flush_loop)
self._flusher.daemon = True

try:
self._flusher.start()
except RuntimeError:
self._running = False
return False

return True

def _flush_loop(self):
# type: (...) -> None
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def add(
self,
metric, # type: TraceMetric
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
self._metric_buffer.append(metric)
if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_FLUSH:
self._flush_event.set()

def kill(self):
# type: (...) -> None
if self._flusher is None:
return

self._running = False
self._flush_event.set()
self._flusher = None

def flush(self):
# type: (...) -> None
self._flush()

@staticmethod
def _metric_to_transport_format(metric):
# type: (TraceMetric) -> Any
def format_attribute(val):
# type: (int | float | str | bool) -> Any
if isinstance(val, bool):
return {"value": val, "type": "boolean"}
if isinstance(val, int):
return {"value": val, "type": "integer"}
if isinstance(val, float):
return {"value": val, "type": "double"}
if isinstance(val, str):
return {"value": val, "type": "string"}
return {"value": safe_repr(val), "type": "string"}

res = {
"timestamp": metric["timestamp"],
"trace_id": metric["trace_id"],
"name": metric["name"],
"type": metric["type"],
"value": metric["value"],
"attributes": {
k: format_attribute(v) for (k, v) in metric["attributes"].items()
},
}

if metric.get("span_id") is not None:
res["span_id"] = metric["span_id"]

if metric.get("unit") is not None:
res["unit"] = metric["unit"]

return res

def _flush(self):
# type: (...) -> Optional[Envelope]

envelope = Envelope(
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
)
with self._lock:
if len(self._metric_buffer) == 0:
return None

envelope.add_item(
Item(
type="trace_metric",
content_type="application/vnd.sentry.items.trace-metric+json",
headers={
"item_count": len(self._metric_buffer),
},
payload=PayloadRef(
json={
"items": [
self._metric_to_transport_format(metric)
for metric in self._metric_buffer
]
}
),
)
)
self._metric_buffer.clear()

self._capture_func(envelope)
return envelope

27 changes: 27 additions & 0 deletions sentry_sdk/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,32 @@ class SDKInfo(TypedDict):
},
)

TraceMetricType = Literal["counter", "gauge", "distribution"]

TraceMetricAttributeValue = TypedDict(
"TraceMetricAttributeValue",
{
"value": Union[str, bool, float, int],
"type": Literal["string", "boolean", "double", "integer"],
},
)

TraceMetric = TypedDict(
"TraceMetric",
{
"timestamp": float,
"trace_id": str,
"span_id": Optional[str],
"name": str,
"type": TraceMetricType,
"value": float,
"unit": Optional[str],
"attributes": dict[str, TraceMetricAttributeValue],
},
)

TraceMetricProcessor = Callable[[TraceMetric, Hint], Optional[TraceMetric]]

# TODO: Make a proper type definition for this (PRs welcome!)
Breadcrumb = Dict[str, Any]

Expand Down Expand Up @@ -270,6 +296,7 @@ class SDKInfo(TypedDict):
"monitor",
"span",
"log_item",
"trace_metric",
]
SessionStatus = Literal["ok", "exited", "crashed", "abnormal"]

Expand Down
67 changes: 67 additions & 0 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
logger,
get_before_send_log,
has_logs_enabled,
has_trace_metrics_enabled,
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
Expand Down Expand Up @@ -184,6 +185,7 @@ def __init__(self, options=None):
self.monitor = None # type: Optional[Monitor]
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
self.log_batcher = None # type: Optional[LogBatcher]
self.trace_metrics_batcher = None # type: Optional[TraceMetricsBatcher]

def __getstate__(self, *args, **kwargs):
# type: (*Any, **Any) -> Any
Expand Down Expand Up @@ -219,6 +221,10 @@ def _capture_experimental_log(self, log):
# type: (Log) -> None
pass

def _capture_trace_metric(self, metric):
# type: (TraceMetric) -> None
pass

def capture_session(self, *args, **kwargs):
# type: (*Any, **Any) -> None
return None
Expand Down Expand Up @@ -388,6 +394,13 @@ def _capture_envelope(envelope):

self.log_batcher = LogBatcher(capture_func=_capture_envelope)

self.trace_metrics_batcher = None

if has_trace_metrics_enabled(self.options):
from sentry_sdk._trace_metrics_batcher import TraceMetricsBatcher

self.trace_metrics_batcher = TraceMetricsBatcher(capture_func=_capture_envelope)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
raise ValueError(
Expand Down Expand Up @@ -967,6 +980,56 @@ def _capture_experimental_log(self, log):
if self.log_batcher:
self.log_batcher.add(log)

def _capture_trace_metric(self, metric):
# type: (Optional[TraceMetric]) -> None
if not has_trace_metrics_enabled(self.options) or metric is None:
return

current_scope = sentry_sdk.get_current_scope()
isolation_scope = sentry_sdk.get_isolation_scope()

metric["attributes"]["sentry.sdk.name"] = SDK_INFO["name"]
metric["attributes"]["sentry.sdk.version"] = SDK_INFO["version"]

environment = self.options.get("environment")
if environment is not None and "sentry.environment" not in metric["attributes"]:
metric["attributes"]["sentry.environment"] = environment

release = self.options.get("release")
if release is not None and "sentry.release" not in metric["attributes"]:
metric["attributes"]["sentry.release"] = release

if isolation_scope._user is not None:
for metric_attribute, user_attribute in (
("user.id", "id"),
("user.name", "username"),
("user.email", "email"),
):
if (
user_attribute in isolation_scope._user
and metric_attribute not in metric["attributes"]
):
metric["attributes"][metric_attribute] = isolation_scope._user[
user_attribute
]

debug = self.options.get("debug", False)
if debug:
logger.debug(
f"[Sentry Trace Metrics] [{metric.get('type')}] {metric.get('name')}: {metric.get('value')}"
)

from sentry_sdk.utils import get_before_send_trace_metric
before_send_trace_metric = get_before_send_trace_metric(self.options)
if before_send_trace_metric is not None:
metric = before_send_trace_metric(metric, {})

if metric is None:
return

if self.trace_metrics_batcher:
self.trace_metrics_batcher.add(metric)

def capture_session(
self,
session, # type: Session
Expand Down Expand Up @@ -1023,6 +1086,8 @@ def close(
self.metrics_aggregator.kill()
if self.log_batcher is not None:
self.log_batcher.kill()
if self.trace_metrics_batcher is not None:
self.trace_metrics_batcher.kill()
if self.monitor:
self.monitor.kill()
self.transport.kill()
Expand All @@ -1049,6 +1114,8 @@ def flush(
self.metrics_aggregator.flush()
if self.log_batcher is not None:
self.log_batcher.flush()
if self.trace_metrics_batcher is not None:
self.trace_metrics_batcher.flush()
self.transport.flush(timeout=timeout, callback=callback)

def __enter__(self):
Expand Down
2 changes: 2 additions & 0 deletions sentry_sdk/envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ def data_category(self):
return "error"
elif ty == "log":
return "log_item"
elif ty == "trace_metric":
return "trace_metric"
elif ty == "client_report":
return "internal"
elif ty == "profile":
Expand Down
Loading
Loading