diff --git a/sentry_sdk/_metrics.py b/sentry_sdk/_metrics.py new file mode 100644 index 0000000000..03bde137bd --- /dev/null +++ b/sentry_sdk/_metrics.py @@ -0,0 +1,81 @@ +""" +NOTE: This file contains experimental code that may be changed or removed at any +time without prior notice. +""" + +import time +from typing import Any, Optional, TYPE_CHECKING, Union + +import sentry_sdk +from sentry_sdk.utils import safe_repr + +if TYPE_CHECKING: + from sentry_sdk._types import Metric, MetricType + + +def _capture_metric( + name, # type: str + metric_type, # type: MetricType + value, # type: float + unit=None, # type: Optional[str] + attributes=None, # type: Optional[dict[str, Any]] +): + # type: (...) -> None + client = sentry_sdk.get_client() + + attrs = {} # type: dict[str, Union[str, bool, float, int]] + if attributes: + for k, v in attributes.items(): + attrs[k] = ( + v + if ( + isinstance(v, str) + or isinstance(v, int) + or isinstance(v, bool) + or isinstance(v, float) + ) + else safe_repr(v) + ) + + metric = { + "timestamp": time.time(), + "trace_id": None, + "span_id": None, + "name": name, + "type": metric_type, + "value": float(value), + "unit": unit, + "attributes": attrs, + } # type: Metric + + client._capture_metric(metric) + + +def count( + name, # type: str + value, # type: float + unit=None, # type: Optional[str] + attributes=None, # type: Optional[dict[str, Any]] +): + # type: (...) -> None + _capture_metric(name, "counter", value, unit, attributes) + + +def gauge( + name, # type: str + value, # type: float + unit=None, # type: Optional[str] + attributes=None, # type: Optional[dict[str, Any]] +): + # type: (...) -> None + _capture_metric(name, "gauge", value, unit, attributes) + + +def distribution( + name, # type: str + value, # type: float + unit=None, # type: Optional[str] + attributes=None, # type: Optional[dict[str, Any]] +): + # type: (...) -> None + _capture_metric(name, "distribution", value, unit, attributes) diff --git a/sentry_sdk/_metrics_batcher.py b/sentry_sdk/_metrics_batcher.py new file mode 100644 index 0000000000..fd9a5d732b --- /dev/null +++ b/sentry_sdk/_metrics_batcher.py @@ -0,0 +1,156 @@ +import os +import random +import threading +from datetime import datetime, timezone +from typing import Optional, List, Callable, TYPE_CHECKING, Any, Union + +from sentry_sdk.utils import format_timestamp, safe_repr +from sentry_sdk.envelope import Envelope, Item, PayloadRef + +if TYPE_CHECKING: + from sentry_sdk._types import Metric + + +class MetricsBatcher: + MAX_METRICS_BEFORE_FLUSH = 100 + FLUSH_WAIT_TIME = 5.0 + + def __init__( + self, + capture_func, # type: Callable[[Envelope], None] + ): + # type: (...) -> None + self._metric_buffer = [] # type: List[Metric] + self._capture_func = capture_func + self._running = True + self._lock = threading.Lock() + + self._flush_event = threading.Event() # type: threading.Event + + self._flusher = None # type: Optional[threading.Thread] + self._flusher_pid = None # type: Optional[int] + + def _ensure_thread(self): + # type: (...) -> bool + if not self._running: + return False + + pid = os.getpid() + if self._flusher_pid == pid: + return True + + with self._lock: + if self._flusher_pid == pid: + return True + + self._flusher_pid = pid + + self._flusher = threading.Thread(target=self._flush_loop) + self._flusher.daemon = True + + try: + self._flusher.start() + except RuntimeError: + self._running = False + return False + + return True + + def _flush_loop(self): + # type: (...) -> None + while self._running: + self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) + self._flush_event.clear() + self._flush() + + def add( + self, + metric, # type: Metric + ): + # type: (...) -> None + if not self._ensure_thread() or self._flusher is None: + return None + + with self._lock: + self._metric_buffer.append(metric) + if len(self._metric_buffer) >= self.MAX_METRICS_BEFORE_FLUSH: + self._flush_event.set() + + def kill(self): + # type: (...) -> None + if self._flusher is None: + return + + self._running = False + self._flush_event.set() + self._flusher = None + + def flush(self): + # type: (...) -> None + self._flush() + + @staticmethod + def _metric_to_transport_format(metric): + # type: (Metric) -> Any + def format_attribute(val): + # type: (Union[int, float, str, bool]) -> Any + if isinstance(val, bool): + return {"value": val, "type": "boolean"} + if isinstance(val, int): + return {"value": val, "type": "integer"} + if isinstance(val, float): + return {"value": val, "type": "double"} + if isinstance(val, str): + return {"value": val, "type": "string"} + return {"value": safe_repr(val), "type": "string"} + + res = { + "timestamp": metric["timestamp"], + "trace_id": metric["trace_id"], + "name": metric["name"], + "type": metric["type"], + "value": metric["value"], + "attributes": { + k: format_attribute(v) for (k, v) in metric["attributes"].items() + }, + } + + if metric.get("span_id") is not None: + res["span_id"] = metric["span_id"] + + if metric.get("unit") is not None: + res["unit"] = metric["unit"] + + return res + + def _flush(self): + # type: (...) -> Optional[Envelope] + + envelope = Envelope( + headers={"sent_at": format_timestamp(datetime.now(timezone.utc))} + ) + with self._lock: + if len(self._metric_buffer) == 0: + return None + + envelope.add_item( + Item( + type="trace_metric", + content_type="application/vnd.sentry.items.trace-metric+json", + headers={ + "item_count": len(self._metric_buffer), + }, + payload=PayloadRef( + json={ + "items": [ + self._metric_to_transport_format(metric) + for metric in self._metric_buffer + ] + } + ), + ) + ) + self._metric_buffer.clear() + + self._capture_func(envelope) + return envelope diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index d057f215e4..66ed7df4f7 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -234,6 +234,32 @@ class SDKInfo(TypedDict): }, ) + MetricType = Literal["counter", "gauge", "distribution"] + + MetricAttributeValue = TypedDict( + "MetricAttributeValue", + { + "value": Union[str, bool, float, int], + "type": Literal["string", "boolean", "double", "integer"], + }, + ) + + Metric = TypedDict( + "Metric", + { + "timestamp": float, + "trace_id": Optional[str], + "span_id": Optional[str], + "name": str, + "type": MetricType, + "value": float, + "unit": Optional[str], + "attributes": dict[str, str | bool | float | int], + }, + ) + + MetricProcessor = Callable[[Metric, Hint], Optional[Metric]] + # TODO: Make a proper type definition for this (PRs welcome!) Breadcrumb = Dict[str, Any] @@ -268,6 +294,7 @@ class SDKInfo(TypedDict): "monitor", "span", "log_item", + "trace_metric", ] SessionStatus = Literal["ok", "exited", "crashed", "abnormal"] diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 9401c3f0b0..d17f922642 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -24,7 +24,9 @@ is_gevent, logger, get_before_send_log, + get_before_send_metric, has_logs_enabled, + has_metrics_enabled, ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace @@ -59,13 +61,14 @@ from typing import Union from typing import TypeVar - from sentry_sdk._types import Event, Hint, SDKInfo, Log + from sentry_sdk._types import Event, Hint, SDKInfo, Log, Metric from sentry_sdk.integrations import Integration from sentry_sdk.scope import Scope from sentry_sdk.session import Session from sentry_sdk.spotlight import SpotlightClient from sentry_sdk.transport import Transport from sentry_sdk._log_batcher import LogBatcher + from sentry_sdk._metrics_batcher import MetricsBatcher I = TypeVar("I", bound=Integration) # noqa: E741 @@ -182,6 +185,7 @@ def __init__(self, options=None): self.transport = None # type: Optional[Transport] self.monitor = None # type: Optional[Monitor] self.log_batcher = None # type: Optional[LogBatcher] + self.metrics_batcher = None # type: Optional[MetricsBatcher] def __getstate__(self, *args, **kwargs): # type: (*Any, **Any) -> Any @@ -217,6 +221,10 @@ def _capture_log(self, log): # type: (Log) -> None pass + def _capture_metric(self, metric): + # type: (Metric) -> None + pass + def capture_session(self, *args, **kwargs): # type: (*Any, **Any) -> None return None @@ -366,6 +374,13 @@ def _capture_envelope(envelope): self.log_batcher = LogBatcher(capture_func=_capture_envelope) + self.metrics_batcher = None + + if has_metrics_enabled(self.options): + from sentry_sdk._metrics_batcher import MetricsBatcher + + self.metrics_batcher = MetricsBatcher(capture_func=_capture_envelope) + max_request_body_size = ("always", "never", "small", "medium") if self.options["max_request_body_size"] not in max_request_body_size: raise ValueError( @@ -944,6 +959,65 @@ def _capture_log(self, log): if self.log_batcher: self.log_batcher.add(log) + def _capture_metric(self, metric): + # type: (Optional[Metric]) -> None + if not has_metrics_enabled(self.options) or metric is None: + return + + isolation_scope = sentry_sdk.get_isolation_scope() + + metric["attributes"]["sentry.sdk.name"] = SDK_INFO["name"] + metric["attributes"]["sentry.sdk.version"] = SDK_INFO["version"] + + environment = self.options.get("environment") + if environment is not None and "sentry.environment" not in metric["attributes"]: + metric["attributes"]["sentry.environment"] = environment + + release = self.options.get("release") + if release is not None and "sentry.release" not in metric["attributes"]: + metric["attributes"]["sentry.release"] = release + + span = sentry_sdk.get_current_span() + metric["trace_id"] = "00000000-0000-0000-0000-000000000000" + + if span: + metric["trace_id"] = span.trace_id + metric["span_id"] = span.span_id + else: + propagation_context = isolation_scope.get_active_propagation_context() + if propagation_context and propagation_context.trace_id: + metric["trace_id"] = propagation_context.trace_id + + if isolation_scope._user is not None: + for metric_attribute, user_attribute in ( + ("user.id", "id"), + ("user.name", "username"), + ("user.email", "email"), + ): + if ( + user_attribute in isolation_scope._user + and metric_attribute not in metric["attributes"] + ): + metric["attributes"][metric_attribute] = isolation_scope._user[ + user_attribute + ] + + debug = self.options.get("debug", False) + if debug: + logger.debug( + f"[Sentry Metrics] [{metric.get('type')}] {metric.get('name')}: {metric.get('value')}" + ) + + before_send_metric = get_before_send_metric(self.options) + if before_send_metric is not None: + metric = before_send_metric(metric, {}) + + if metric is None: + return + + if self.metrics_batcher: + self.metrics_batcher.add(metric) + def capture_session( self, session, # type: Session @@ -998,6 +1072,8 @@ def close( self.session_flusher.kill() if self.log_batcher is not None: self.log_batcher.kill() + if self.metrics_batcher is not None: + self.metrics_batcher.kill() if self.monitor: self.monitor.kill() self.transport.kill() @@ -1022,6 +1098,8 @@ def flush( self.session_flusher.flush() if self.log_batcher is not None: self.log_batcher.flush() + if self.metrics_batcher is not None: + self.metrics_batcher.flush() self.transport.flush(timeout=timeout, callback=callback) def __enter__(self): diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 0f71a0d460..12654cc76d 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -52,6 +52,7 @@ class CompressionAlgo(Enum): Hint, Log, MeasurementUnit, + Metric, ProfilerMode, TracesSampler, TransactionProcessor, @@ -77,6 +78,8 @@ class CompressionAlgo(Enum): "transport_http2": Optional[bool], "enable_logs": Optional[bool], "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], + "enable_metrics": Optional[bool], + "before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]], }, total=False, ) diff --git a/sentry_sdk/envelope.py b/sentry_sdk/envelope.py index b26c458d41..56bb5fde73 100644 --- a/sentry_sdk/envelope.py +++ b/sentry_sdk/envelope.py @@ -285,6 +285,8 @@ def data_category(self): return "error" elif ty == "log": return "log_item" + elif ty == "trace_metric": + return "trace_metric" elif ty == "client_report": return "internal" elif ty == "profile": diff --git a/sentry_sdk/types.py b/sentry_sdk/types.py index 1a65247584..8b28166462 100644 --- a/sentry_sdk/types.py +++ b/sentry_sdk/types.py @@ -21,6 +21,7 @@ Log, MonitorConfig, SamplingContext, + Metric, ) else: from typing import Any @@ -35,6 +36,7 @@ Log = Any MonitorConfig = Any SamplingContext = Any + Metric = Any __all__ = ( @@ -46,4 +48,5 @@ "Log", "MonitorConfig", "SamplingContext", + "Metric", ) diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index 2083fd296c..cd825b29e2 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -59,7 +59,7 @@ from gevent.hub import Hub - from sentry_sdk._types import Event, ExcInfo, Log, Hint + from sentry_sdk._types import Event, ExcInfo, Log, Hint, Metric P = ParamSpec("P") R = TypeVar("R") @@ -2013,3 +2013,19 @@ def get_before_send_log(options): return options.get("before_send_log") or options["_experiments"].get( "before_send_log" ) + + +def has_metrics_enabled(options): + # type: (Optional[dict[str, Any]]) -> bool + if options is None: + return False + + return bool(options["_experiments"].get("enable_metrics", False)) + + +def get_before_send_metric(options): + # type: (Optional[dict[str, Any]]) -> Optional[Callable[[Metric, Hint], Optional[Metric]]] + if options is None: + return None + + return options["_experiments"].get("before_send_metric") diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000000..5e774227fd --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,208 @@ +import json +import sys +from typing import List, Any, Mapping +import pytest + +import sentry_sdk +from sentry_sdk import _metrics +from sentry_sdk import get_client +from sentry_sdk.envelope import Envelope +from sentry_sdk.types import Metric + + +def envelopes_to_metrics(envelopes): + # type: (List[Envelope]) -> List[Metric] + res = [] # type: List[Metric] + for envelope in envelopes: + for item in envelope.items: + if item.type == "trace_metric": + for metric_json in item.payload.json["items"]: + metric = { + "timestamp": metric_json["timestamp"], + "trace_id": metric_json["trace_id"], + "span_id": metric_json.get("span_id"), + "name": metric_json["name"], + "type": metric_json["type"], + "value": metric_json["value"], + "unit": metric_json.get("unit"), + "attributes": { + k: v["value"] + for (k, v) in metric_json["attributes"].items() + }, + } # type: Metric + res.append(metric) + return res + + +def test_metrics_disabled_by_default(sentry_init, capture_envelopes): + sentry_init() + + envelopes = capture_envelopes() + + _metrics.count("test.counter", 1) + _metrics.gauge("test.gauge", 42) + _metrics.distribution("test.distribution", 200) + + assert len(envelopes) == 0 + + +def test_metrics_basics(sentry_init, capture_envelopes): + sentry_init(_experiments={"enable_metrics": True}) + envelopes = capture_envelopes() + + _metrics.count("test.counter", 1) + _metrics.gauge("test.gauge", 42, unit="millisecond") + _metrics.distribution("test.distribution", 200, unit="second") + + get_client().flush() + metrics = envelopes_to_metrics(envelopes) + + assert len(metrics) == 3 + + assert metrics[0]["name"] == "test.counter" + assert metrics[0]["type"] == "counter" + assert metrics[0]["value"] == 1.0 + assert metrics[0]["unit"] is None + assert "sentry.sdk.name" in metrics[0]["attributes"] + assert "sentry.sdk.version" in metrics[0]["attributes"] + + assert metrics[1]["name"] == "test.gauge" + assert metrics[1]["type"] == "gauge" + assert metrics[1]["value"] == 42.0 + assert metrics[1]["unit"] == "millisecond" + + assert metrics[2]["name"] == "test.distribution" + assert metrics[2]["type"] == "distribution" + assert metrics[2]["value"] == 200.0 + assert metrics[2]["unit"] == "second" + + +def test_metrics_experimental_option(sentry_init, capture_envelopes): + sentry_init(_experiments={"enable_metrics": True}) + envelopes = capture_envelopes() + + _metrics.count("test.counter", 5) + + get_client().flush() + + metrics = envelopes_to_metrics(envelopes) + assert len(metrics) == 1 + + assert metrics[0]["name"] == "test.counter" + assert metrics[0]["type"] == "counter" + assert metrics[0]["value"] == 5.0 + + +def test_metrics_with_attributes(sentry_init, capture_envelopes): + sentry_init( + _experiments={"enable_metrics": True}, release="1.0.0", environment="test" + ) + envelopes = capture_envelopes() + + _metrics.count( + "test.counter", 1, attributes={"endpoint": "/api/test", "status": "success"} + ) + + get_client().flush() + + metrics = envelopes_to_metrics(envelopes) + assert len(metrics) == 1 + + assert metrics[0]["attributes"]["endpoint"] == "/api/test" + assert metrics[0]["attributes"]["status"] == "success" + assert metrics[0]["attributes"]["sentry.release"] == "1.0.0" + assert metrics[0]["attributes"]["sentry.environment"] == "test" + + +def test_metrics_with_user(sentry_init, capture_envelopes): + sentry_init(_experiments={"enable_metrics": True}) + envelopes = capture_envelopes() + + sentry_sdk.set_user( + {"id": "user-123", "email": "test@example.com", "username": "testuser"} + ) + _metrics.count("test.user.counter", 1) + + get_client().flush() + + metrics = envelopes_to_metrics(envelopes) + assert len(metrics) == 1 + + assert metrics[0]["attributes"]["user.id"] == "user-123" + assert metrics[0]["attributes"]["user.email"] == "test@example.com" + assert metrics[0]["attributes"]["user.name"] == "testuser" + + +def test_metrics_with_span(sentry_init, capture_envelopes): + sentry_init(_experiments={"enable_metrics": True}, traces_sample_rate=1.0) + envelopes = capture_envelopes() + + with sentry_sdk.start_transaction(op="test", name="test-span"): + _metrics.count("test.span.counter", 1) + + get_client().flush() + + metrics = envelopes_to_metrics(envelopes) + assert len(metrics) == 1 + + assert metrics[0]["trace_id"] is not None + assert metrics[0]["trace_id"] != "00000000-0000-0000-0000-000000000000" + assert metrics[0]["span_id"] is not None + + +def test_metrics_tracing_without_performance(sentry_init, capture_envelopes): + sentry_init(_experiments={"enable_metrics": True}) + envelopes = capture_envelopes() + + _metrics.count("test.span.counter", 1) + + get_client().flush() + + metrics = envelopes_to_metrics(envelopes) + assert len(metrics) == 1 + + assert metrics[0]["trace_id"] is not None + assert metrics[0]["trace_id"] != "00000000-0000-0000-0000-000000000000" + assert metrics[0]["span_id"] is None + + +def test_metrics_before_send(sentry_init, capture_envelopes): + before_metric_called = False + + def _before_metric(record, hint): + nonlocal before_metric_called + + assert set(record.keys()) == { + "timestamp", + "trace_id", + "span_id", + "name", + "type", + "value", + "unit", + "attributes", + } + + if record["name"] == "test.skip": + return None + + before_metric_called = True + return record + + sentry_init( + _experiments={ + "enable_metrics": True, + "before_send_metric": _before_metric, + }, + ) + envelopes = capture_envelopes() + + _metrics.count("test.skip", 1) + _metrics.count("test.keep", 1) + + get_client().flush() + + metrics = envelopes_to_metrics(envelopes) + assert len(metrics) == 1 + assert metrics[0]["name"] == "test.keep" + assert before_metric_called