diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py new file mode 100644 index 0000000000..e99d495f50 --- /dev/null +++ b/sentry_sdk/_span_batcher.py @@ -0,0 +1,198 @@ +# This file is experimental and its contents may change without notice. This is +# a simple POC buffer implementation. Eventually, we should switch to a telemetry +# buffer: https://develop.sentry.dev/sdk/telemetry/telemetry-buffer/ + +import os +import random +import threading +from collections import defaultdict +from datetime import datetime, timezone +from typing import Optional, List, Callable, TYPE_CHECKING, Any + +from sentry_sdk.consts import SPANSTATUS +from sentry_sdk.envelope import Envelope, Item, PayloadRef +from sentry_sdk.tracing import Transaction + +if TYPE_CHECKING: + from sentry_sdk.tracing import Span + from sentry_sdk._types import SpanV2 + + +class SpanBatcher: + # TODO[span-first]: Adjust limits. Protocol dictates at most 1000 spans + # in an envelope. + MAX_SPANS_BEFORE_FLUSH = 1_000 + MAX_SPANS_BEFORE_DROP = 2_000 + FLUSH_WAIT_TIME = 5.0 + + def __init__( + self, + capture_func, # type: Callable[[Envelope], None] + record_lost_func, # type: Callable[..., None] + ): + # type: (...) -> None + # Spans from different traces cannot be emitted in the same envelope + # since the envelope contains a shared trace header. That's why we bucket + # by trace_id, so that we can then send the buckets each in its own + # envelope. + # trace_id -> span buffer + self._span_buffer = defaultdict(list) # type: dict[str, list[Span]] + self._capture_func = capture_func + self._record_lost_func = record_lost_func + self._running = True + self._lock = threading.Lock() + + self._flush_event = threading.Event() # type: threading.Event + + self._flusher = None # type: Optional[threading.Thread] + self._flusher_pid = None # type: Optional[int] + + def _ensure_thread(self): + # type: (...) -> bool + """For forking processes we might need to restart this thread. + This ensures that our process actually has that thread running. + """ + if not self._running: + return False + + pid = os.getpid() + if self._flusher_pid == pid: + return True + + with self._lock: + # Recheck to make sure another thread didn't get here and start the + # the flusher in the meantime + if self._flusher_pid == pid: + return True + + self._flusher_pid = pid + + self._flusher = threading.Thread(target=self._flush_loop) + self._flusher.daemon = True + + try: + self._flusher.start() + except RuntimeError: + # Unfortunately at this point the interpreter is in a state that no + # longer allows us to spawn a thread and we have to bail. + self._running = False + return False + + return True + + def _flush_loop(self): + # type: (...) -> None + while self._running: + self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) + self._flush_event.clear() + self._flush() + + def get_size(self): + # type: () -> int + # caller is responsible for locking before checking this + return sum(len(buffer) for buffer in self._span_buffer.values()) + + def add(self, span): + # type: (Span) -> None + if not self._ensure_thread() or self._flusher is None: + return None + + with self._lock: + if self.get_size() >= self.MAX_SPANS_BEFORE_DROP: + self._record_lost_func( + reason="queue_overflow", + data_category="span", + quantity=1, + ) + return None + + self._span_buffer[span.trace_id].append(span) + if self.get_size() >= self.MAX_SPANS_BEFORE_FLUSH: + self._flush_event.set() + + def kill(self): + # type: (...) -> None + if self._flusher is None: + return + + self._running = False + self._flush_event.set() + self._flusher = None + + def flush(self): + # type: (...) -> None + self._flush() + + @staticmethod + def _span_to_transport_format(span): + # type: (Span) -> SpanV2 + from sentry_sdk.utils import attribute_value_to_transport_format, safe_repr + + is_segment = span.containing_transaction == span + + res = { + "trace_id": span.trace_id, + "span_id": span.span_id, + "name": span.name if is_segment else span.description, + "status": SPANSTATUS.OK + if span.status == SPANSTATUS.OK + else SPANSTATUS.INTERNAL_ERROR, + "is_segment": is_segment, + "start_timestamp": span.start_timestamp.timestamp(), # TODO[span-first] + "end_timestamp": span.timestamp.timestamp(), + } + + if span.parent_span_id: + res["parent_span_id"] = span.parent_span_id + + if span._attributes: + res["attributes"] = { + k: attribute_value_to_transport_format(v) + for (k, v) in span._attributes.items() + } + + return res + + def _flush(self): + # type: (...) -> Optional[Envelope] + from sentry_sdk.utils import format_timestamp + + with self._lock: + if len(self._span_buffer) == 0: + return None + + for trace_id, spans in self._span_buffer.items(): + if spans: + trace_context = spans[0].get_trace_context() + dsc = trace_context.get("dynamic_sampling_context") + # XXX[span-first]: empty dsc? + + envelope = Envelope( + headers={ + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type="span", + content_type="application/vnd.sentry.items.span.v2+json", + headers={ + "item_count": len(spans), + }, + payload=PayloadRef( + json={ + "items": [ + self._span_to_transport_format(span) + for span in spans + ] + } + ), + ) + ) + + self._span_buffer.clear() + + self._capture_func(envelope) + return envelope diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index 0426bf7a93..b524636227 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING, TypeVar, Union +from sentry_sdk.consts import SPANSTATUS # Re-exported for compat, since code out there in the wild might use this variable. MYPY = TYPE_CHECKING @@ -222,13 +223,26 @@ class SDKInfo(TypedDict): # TODO: Make a proper type definition for this (PRs welcome!) Hint = Dict[str, Any] + AttributeValue = ( + str | bool | float | int | list[str] | list[bool] | list[float] | list[int] + ) + Attributes = dict[str, AttributeValue] + + SerializedAttributeValue = TypedDict( + "SerializedAttributeValue", + { + "type": Literal["string", "boolean", "double", "integer"], + "value": AttributeValue, + }, + ) + Log = TypedDict( "Log", { "severity_text": str, "severity_number": int, "body": str, - "attributes": dict[str, str | bool | float | int], + "attributes": Attributes, "time_unix_nano": int, "trace_id": Optional[str], }, @@ -236,14 +250,6 @@ class SDKInfo(TypedDict): MetricType = Literal["counter", "gauge", "distribution"] - MetricAttributeValue = TypedDict( - "MetricAttributeValue", - { - "value": Union[str, bool, float, int], - "type": Literal["string", "boolean", "double", "integer"], - }, - ) - Metric = TypedDict( "Metric", { @@ -254,12 +260,29 @@ class SDKInfo(TypedDict): "type": MetricType, "value": float, "unit": Optional[str], - "attributes": dict[str, str | bool | float | int], + "attributes": Attributes, }, ) MetricProcessor = Callable[[Metric, Hint], Optional[Metric]] + # This is the V2 span format + # https://develop.sentry.dev/sdk/telemetry/spans/span-protocol/ + SpanV2 = TypedDict( + "SpanV2", + { + "trace_id": str, + "span_id": str, + "parent_span_id": Optional[str], + "name": str, + "status": Literal[SPANSTATUS.OK, SPANSTATUS.ERROR], + "is_segment": bool, + "start_timestamp": float, + "end_timestamp": float, + "attributes": Attributes, + }, + ) + # TODO: Make a proper type definition for this (PRs welcome!) Breadcrumb = Dict[str, Any] @@ -337,3 +360,5 @@ class SDKInfo(TypedDict): ) HttpStatusCodeRange = Union[int, Container[int]] + + TraceLifecycleMode = Literal["static", "stream"] diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index fa17dbe18c..a7d99135cc 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -11,6 +11,8 @@ import sentry_sdk from sentry_sdk._compat import PY37, check_uwsgi_thread_support from sentry_sdk._metrics_batcher import MetricsBatcher +from sentry_sdk._span_batcher import SpanBatcher +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.utils import ( AnnotatedValue, ContextVar, @@ -26,6 +28,7 @@ logger, get_before_send_log, get_before_send_metric, + get_default_attributes, has_logs_enabled, has_metrics_enabled, ) @@ -187,6 +190,7 @@ def __init__(self, options=None): self.monitor = None # type: Optional[Monitor] self.log_batcher = None # type: Optional[LogBatcher] self.metrics_batcher = None # type: Optional[MetricsBatcher] + self._span_batcher = None # type: Optional[SpanBatcher] def __getstate__(self, *args, **kwargs): # type: (*Any, **Any) -> Any @@ -400,6 +404,13 @@ def _record_lost_event( record_lost_func=_record_lost_event, ) + self._span_batcher = None + if self.options["_experiments"].get("trace_lifecycle", None) == "stream": + self._span_batcher = SpanBatcher( + capture_func=_capture_envelope, + record_lost_func=_record_lost_event, + ) + max_request_body_size = ("always", "never", "small", "medium") if self.options["max_request_body_size"] not in max_request_body_size: raise ValueError( @@ -929,8 +940,28 @@ def capture_event( return return_value + def _capture_span(self, span): + # type: (Span) -> None + # Used for span streaming (trace_lifecycle == "stream"). + if not has_span_streaming_enabled(self.options): + return + + attributes = get_default_attributes() + span._attributes = attributes | span._attributes + + segment = span.containing_transaction + span._attributes["sentry.segment.id"] = segment.span_id + span._attributes["sentry.segment.name"] = segment.name + + if self._span_batcher: + logger.debug( + f"[Tracing] Adding span {span.span_id} of segment {segment.span_id} to batcher" + ) + self._span_batcher.add(span) + def _capture_log(self, log): # type: (Optional[Log]) -> None + # TODO[ivana]: Use get_default_attributes here if not has_logs_enabled(self.options) or log is None: return @@ -999,6 +1030,7 @@ def _capture_log(self, log): def _capture_metric(self, metric): # type: (Optional[Metric]) -> None + # TODO[ivana]: Use get_default_attributes here if not has_metrics_enabled(self.options) or metric is None: return diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 3d719401fe..e11832c63c 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -42,10 +42,12 @@ class CompressionAlgo(Enum): from typing import Sequence from typing import Tuple from typing import AbstractSet + from typing import Pattern from typing_extensions import Literal from typing_extensions import TypedDict from sentry_sdk._types import ( + Attributes, BreadcrumbProcessor, ContinuousProfilerMode, Event, @@ -56,6 +58,7 @@ class CompressionAlgo(Enum): Metric, ProfilerMode, TracesSampler, + TraceLifecycleMode, TransactionProcessor, ) @@ -81,6 +84,13 @@ class CompressionAlgo(Enum): "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], "enable_metrics": Optional[bool], "before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]], + "trace_lifecycle": Optional[TraceLifecycleMode], + "ignore_spans": Optional[ + list[ + Union[str, Pattern], + dict[Union[Literal["name", "attributes"], Union[str, Attributes]],], + ] + ], }, total=False, ) diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 2038dc4501..55cb220f0d 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -22,6 +22,7 @@ from sentry_sdk.session import Session from sentry_sdk.tracing_utils import ( Baggage, + has_span_streaming_enabled, has_tracing_enabled, normalize_incoming_data, PropagationContext, @@ -1116,9 +1117,10 @@ def start_transaction( transaction.set_profiler_id(get_profiler_id()) # we don't bother to keep spans if we already know we're not going to - # send the transaction - max_spans = (client.options["_experiments"].get("max_spans")) or 1000 - transaction.init_span_recorder(maxlen=max_spans) + # send the transaction or if we're in streaming mode + if not has_span_streaming_enabled(client.options): + max_spans = (client.options["_experiments"].get("max_spans")) or 1000 + transaction.init_span_recorder(maxlen=max_spans) return transaction diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index 5a9a053418..455b992ef9 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -13,6 +13,7 @@ logger, nanosecond_time, should_be_treated_as_error, + serialize_attribute, ) from typing import TYPE_CHECKING @@ -44,6 +45,8 @@ MeasurementUnit, SamplingContext, MeasurementValue, + Attributes, + AttributeValue, ) class SpanKwargs(TypedDict, total=False): @@ -281,6 +284,8 @@ class Span: "name", "_flags", "_flags_capacity", + "_mode", + "_attributes", ) def __init__( @@ -299,6 +304,7 @@ def __init__( scope=None, # type: Optional[sentry_sdk.Scope] origin="manual", # type: str name=None, # type: Optional[str] + attributes=None, # type: Optional[dict] ): # type: (...) -> None self._trace_id = trace_id @@ -318,6 +324,7 @@ def __init__( self._containing_transaction = containing_transaction self._flags = {} # type: Dict[str, bool] self._flags_capacity = 10 + self._attributes = attributes or {} # type: Attributes if hub is not None: warnings.warn( @@ -614,6 +621,21 @@ def update_data(self, data): # type: (Dict[str, Any]) -> None self._data.update(data) + def get_attributes(self): + # type: () -> Attributes + return self._attributes + + def set_attribute(self, attribute, value): + # type: (str, AttributeValue) -> None + self._attributes[attribute] = serialize_attribute(value) + + def remove_attribute(self, attribute): + # type: (str) -> None + try: + del self._attributes[attribute] + except KeyError: + pass + def set_flag(self, flag, result): # type: (str, bool) -> None if len(self._flags) < self._flags_capacity: @@ -663,22 +685,7 @@ def is_success(self): # type: () -> bool return self.status == "ok" - def finish(self, scope=None, end_timestamp=None): - # type: (Optional[sentry_sdk.Scope], Optional[Union[float, datetime]]) -> Optional[str] - """ - Sets the end timestamp of the span. - - Additionally it also creates a breadcrumb from the span, - if the span represents a database or HTTP request. - - :param scope: The scope to use for this transaction. - If not provided, the current scope will be used. - :param end_timestamp: Optional timestamp that should - be used as timestamp instead of the current time. - - :return: Always ``None``. The type is ``Optional[str]`` to match - the return value of :py:meth:`sentry_sdk.tracing.Transaction.finish`. - """ + def _finish(self, scope=None, end_timestamp=None): if self.timestamp is not None: # This span is already finished, ignore. return None @@ -699,6 +706,32 @@ def finish(self, scope=None, end_timestamp=None): scope = scope or sentry_sdk.get_current_scope() maybe_create_breadcrumbs_from_span(scope, self) + def finish(self, scope=None, end_timestamp=None): + # type: (Optional[sentry_sdk.Scope], Optional[Union[float, datetime]]) -> Optional[str] + """ + Sets the end timestamp of the span. + + Additionally it also creates a breadcrumb from the span, + if the span represents a database or HTTP request. + + :param scope: The scope to use for this transaction. + If not provided, the current scope will be used. + :param end_timestamp: Optional timestamp that should + be used as timestamp instead of the current time. + + :return: Always ``None``. The type is ``Optional[str]`` to match + the return value of :py:meth:`sentry_sdk.tracing.Transaction.finish`. + """ + self._finish(scope, end_timestamp) + + client = sentry_sdk.get_client() + if client.is_active(): + if ( + has_span_streaming_enabled(client.options) + and self.containing_transaction.sampled + ): + client._capture_span(self) + return None def to_json(self): @@ -827,9 +860,10 @@ def __init__( # type: ignore[misc] **kwargs, # type: Unpack[SpanKwargs] ): # type: (...) -> None + self.name = name + super().__init__(**kwargs) - self.name = name self.source = source self.sample_rate = None # type: Optional[float] self.parent_sampled = parent_sampled @@ -847,6 +881,12 @@ def __init__( # type: ignore[misc] else: self._sample_rand = _generate_sample_rand(self.trace_id) + self._mode = "static" + client = sentry_sdk.get_client() + if client.is_active(): + if has_span_streaming_enabled(client.options): + self._mode = "stream" + def __repr__(self): # type: () -> str return ( @@ -872,9 +912,11 @@ def _possibly_started(self): with sentry_sdk.start_transaction, and therefore the transaction will be discarded. """ - - # We must explicitly check self.sampled is False since self.sampled can be None - return self._span_recorder is not None or self.sampled is False + if self._mode == "static": + # We must explicitly check self.sampled is False since self.sampled can be None + return self._span_recorder is not None or self.sampled is False + else: + return True def __enter__(self): # type: () -> Transaction @@ -962,7 +1004,8 @@ def finish( ): # type: (...) -> Optional[str] """Finishes the transaction and sends it to Sentry. - All finished spans in the transaction will also be sent to Sentry. + If we're in non-streaming mode, all finished spans in the transaction + will also be sent to Sentry at this point. :param scope: The Scope to use for this transaction. If not provided, the current Scope will be used. @@ -990,7 +1033,7 @@ def finish( # We have no active client and therefore nowhere to send this transaction. return None - if self._span_recorder is None: + if self._mode == "static" and self._span_recorder is None: # Explicit check against False needed because self.sampled might be None if self.sampled is False: logger.debug("Discarding transaction because sampled = False") @@ -1020,11 +1063,12 @@ def finish( ) self.name = "" - super().finish(scope, end_timestamp) + super()._finish(scope, end_timestamp) status_code = self._data.get(SPANDATA.HTTP_STATUS_CODE) if ( - status_code is not None + self._mode == "static" + and status_code is not None and status_code in client.options["trace_ignore_status_codes"] ): logger.debug( @@ -1056,6 +1100,11 @@ def finish( return None + if self._mode == "stream": + if self.containing_transaction.sampled: + client._capture_span(self) + return + finished_spans = [ span.to_json() for span in self._span_recorder.spans @@ -1097,7 +1146,6 @@ def finish( self._profile = None event["measurements"] = self._measurements - return scope.capture_event(event) def set_measurement(self, name, value, unit=""): @@ -1472,5 +1520,6 @@ def calculate_interest_rate(amount, rate, years): extract_sentrytrace_data, _generate_sample_rand, has_tracing_enabled, + has_span_streaming_enabled, maybe_create_breadcrumbs_from_span, ) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 472bd6bbdd..b5e7e4f48a 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -110,6 +110,14 @@ def has_tracing_enabled(options): ) +def has_span_streaming_enabled(options): + # type: (Optional[Dict[str, Any]]) -> bool + if options is None: + return False + + return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream" + + @contextlib.contextmanager def record_sql_queries( cursor, # type: Any diff --git a/sentry_sdk/utils.py b/sentry_sdk/utils.py index d6dd5c29b2..cc273c02d8 100644 --- a/sentry_sdk/utils.py +++ b/sentry_sdk/utils.py @@ -30,6 +30,7 @@ DEFAULT_ADD_FULL_STACK, DEFAULT_MAX_STACK_FRAMES, DEFAULT_MAX_VALUE_LENGTH, + SPANDATA, EndpointType, ) from sentry_sdk._types import Annotated, AnnotatedValue, SENSITIVE_DATA_SUBSTITUTE @@ -1726,6 +1727,84 @@ def is_sentry_url(client, url): ) +def serialize_attribute(value): + # type: (Any) -> AttributeValue + # check for allowed primitives + if isinstance(value, (int, str, float, bool)): + return value + + # lists are allowed too, as long as they don't mix types + if isinstance(value, (list, tuple)): + for type_ in (int, str, float, bool): + if all(isinstance(item, type_) for item in value): + return list(value) + + return safe_repr(value) + + +def attribute_value_to_transport_format(value): + # type: (Any) -> dict[str, bool | str | int | float] + if isinstance(value, bool): + return {"value": value, "type": "boolean"} + + if isinstance(value, int): + return {"value": value, "type": "integer"} + + if isinstance(value, float): + return {"value": value, "type": "double"} + + if isinstance(value, str): + return {"value": value, "type": "string"} + + return {"value": safe_repr(value), "type": "string"} + + +def get_default_attributes(): + # type: () -> Attributes + # TODO[ivana]: standardize attr names into an enum/take from sentry convs + from sentry_sdk.client import SDK_INFO + from sentry_sdk.scope import should_send_default_pii + + attributes = {} + + attributes["sentry.sdk.name"] = SDK_INFO["name"] + attributes["sentry.sdk.version"] = SDK_INFO["version"] + + options = sentry_sdk.get_client().options + + server_name = options.get("server_name") + if server_name is not None: + attributes[SPANDATA.SERVER_ADDRESS] = server_name + + environment = options.get("environment") + if environment is not None: + attributes["sentry.environment"] = environment + + release = options.get("release") + if release is not None: + attributes["sentry.release"] = release + + thread_id, thread_name = get_current_thread_meta() + if thread_id is not None: + attributes["thread.id"] = thread_id + if thread_name is not None: + attributes["thread.name"] = thread_name + + if should_send_default_pii(): + # The user, if present, is always set on the isolation scope. + isolation_scope = sentry_sdk.get_isolation_scope() + if isolation_scope._user is not None: + for attribute, user_attribute in ( + ("user.id", "id"), + ("user.name", "username"), + ("user.email", "email"), + ): + if attribute in isolation_scope._user: + attributes[attribute] = isolation_scope._user[user_attribute] + + return attributes + + def _generate_installed_modules(): # type: () -> Iterator[Tuple[str, str]] try: