Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# This file is experimental and its contents may change without notice. This is
# a simple POC buffer implementation. Eventually, we should switch to a telemetry
# buffer: https://develop.sentry.dev/sdk/telemetry/telemetry-buffer/

import os
import random
import threading
from collections import defaultdict
from datetime import datetime, timezone
from typing import Optional, List, Callable, TYPE_CHECKING, Any

from sentry_sdk.consts import SPANSTATUS
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.tracing import Transaction

if TYPE_CHECKING:
from sentry_sdk.tracing import Span
from sentry_sdk._types import SpanV2


class SpanBatcher:
# TODO[span-first]: Adjust limits. Protocol dictates at most 1000 spans
# in an envelope.
MAX_SPANS_BEFORE_FLUSH = 1_000
MAX_SPANS_BEFORE_DROP = 2_000
FLUSH_WAIT_TIME = 5.0

def __init__(
self,
capture_func, # type: Callable[[Envelope], None]
record_lost_func, # type: Callable[..., None]
):
# type: (...) -> None
# Spans from different traces cannot be emitted in the same envelope
# since the envelope contains a shared trace header. That's why we bucket
# by trace_id, so that we can then send the buckets each in its own
# envelope.
# trace_id -> span buffer
self._span_buffer = defaultdict(list) # type: dict[str, list[Span]]
self._capture_func = capture_func
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()

self._flush_event = threading.Event() # type: threading.Event

self._flusher = None # type: Optional[threading.Thread]
self._flusher_pid = None # type: Optional[int]

def _ensure_thread(self):
# type: (...) -> bool
"""For forking processes we might need to restart this thread.
This ensures that our process actually has that thread running.
"""
if not self._running:
return False

pid = os.getpid()
if self._flusher_pid == pid:
return True

with self._lock:
# Recheck to make sure another thread didn't get here and start the
# the flusher in the meantime
if self._flusher_pid == pid:
return True

self._flusher_pid = pid

self._flusher = threading.Thread(target=self._flush_loop)
self._flusher.daemon = True

try:
self._flusher.start()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self._running = False
return False

return True

def _flush_loop(self):
# type: (...) -> None
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def get_size(self):
# type: () -> int
# caller is responsible for locking before checking this
return sum(len(buffer) for buffer in self._span_buffer.values())

def add(self, span):
# type: (Span) -> None
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
if self.get_size() >= self.MAX_SPANS_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
if self.get_size() >= self.MAX_SPANS_BEFORE_FLUSH:
self._flush_event.set()

def kill(self):
# type: (...) -> None
if self._flusher is None:
return

self._running = False
self._flush_event.set()
self._flusher = None

def flush(self):
# type: (...) -> None
self._flush()

@staticmethod
def _span_to_transport_format(span):
# type: (Span) -> SpanV2
from sentry_sdk.utils import attribute_value_to_transport_format, safe_repr

is_segment = span.containing_transaction == span

res = {
"trace_id": span.trace_id,
"span_id": span.span_id,
"name": span.name if is_segment else span.description,
"status": SPANSTATUS.OK
if span.status == SPANSTATUS.OK
else SPANSTATUS.INTERNAL_ERROR,
"is_segment": is_segment,
"start_timestamp": span.start_timestamp.timestamp(), # TODO[span-first]
"end_timestamp": span.timestamp.timestamp(),
}

if span.parent_span_id:
res["parent_span_id"] = span.parent_span_id

if span._attributes:
res["attributes"] = {
k: attribute_value_to_transport_format(v)
for (k, v) in span._attributes.items()
}

return res

def _flush(self):
# type: (...) -> Optional[Envelope]
from sentry_sdk.utils import format_timestamp

with self._lock:
if len(self._span_buffer) == 0:
return None

for trace_id, spans in self._span_buffer.items():
if spans:
trace_context = spans[0].get_trace_context()
dsc = trace_context.get("dynamic_sampling_context")
# XXX[span-first]: empty dsc?

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type="span",
content_type="application/vnd.sentry.items.span.v2+json",
headers={
"item_count": len(spans),
},
payload=PayloadRef(
json={
"items": [
self._span_to_transport_format(span)
for span in spans
]
}
),
)
)

self._span_buffer.clear()

self._capture_func(envelope)
return envelope
45 changes: 35 additions & 10 deletions sentry_sdk/_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, TypeVar, Union

from sentry_sdk.consts import SPANSTATUS

# Re-exported for compat, since code out there in the wild might use this variable.
MYPY = TYPE_CHECKING
Expand Down Expand Up @@ -222,28 +223,33 @@ class SDKInfo(TypedDict):
# TODO: Make a proper type definition for this (PRs welcome!)
Hint = Dict[str, Any]

AttributeValue = (
str | bool | float | int | list[str] | list[bool] | list[float] | list[int]
)
Attributes = dict[str, AttributeValue]

SerializedAttributeValue = TypedDict(
"SerializedAttributeValue",
{
"type": Literal["string", "boolean", "double", "integer"],
"value": AttributeValue,
},
)

Log = TypedDict(
"Log",
{
"severity_text": str,
"severity_number": int,
"body": str,
"attributes": dict[str, str | bool | float | int],
"attributes": Attributes,
"time_unix_nano": int,
"trace_id": Optional[str],
},
)

MetricType = Literal["counter", "gauge", "distribution"]

MetricAttributeValue = TypedDict(
"MetricAttributeValue",
{
"value": Union[str, bool, float, int],
"type": Literal["string", "boolean", "double", "integer"],
},
)

Metric = TypedDict(
"Metric",
{
Expand All @@ -254,12 +260,29 @@ class SDKInfo(TypedDict):
"type": MetricType,
"value": float,
"unit": Optional[str],
"attributes": dict[str, str | bool | float | int],
"attributes": Attributes,
},
)

MetricProcessor = Callable[[Metric, Hint], Optional[Metric]]

# This is the V2 span format
# https://develop.sentry.dev/sdk/telemetry/spans/span-protocol/
SpanV2 = TypedDict(
"SpanV2",
{
"trace_id": str,
"span_id": str,
"parent_span_id": Optional[str],
"name": str,
"status": Literal[SPANSTATUS.OK, SPANSTATUS.ERROR],
"is_segment": bool,
"start_timestamp": float,
"end_timestamp": float,
"attributes": Attributes,
},
)

# TODO: Make a proper type definition for this (PRs welcome!)
Breadcrumb = Dict[str, Any]

Expand Down Expand Up @@ -337,3 +360,5 @@ class SDKInfo(TypedDict):
)

HttpStatusCodeRange = Union[int, Container[int]]

TraceLifecycleMode = Literal["static", "stream"]
32 changes: 32 additions & 0 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import sentry_sdk
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
from sentry_sdk._metrics_batcher import MetricsBatcher
from sentry_sdk._span_batcher import SpanBatcher
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import (
AnnotatedValue,
ContextVar,
Expand All @@ -26,6 +28,7 @@
logger,
get_before_send_log,
get_before_send_metric,
get_default_attributes,
has_logs_enabled,
has_metrics_enabled,
)
Expand Down Expand Up @@ -187,6 +190,7 @@ def __init__(self, options=None):
self.monitor = None # type: Optional[Monitor]
self.log_batcher = None # type: Optional[LogBatcher]
self.metrics_batcher = None # type: Optional[MetricsBatcher]
self._span_batcher = None # type: Optional[SpanBatcher]

def __getstate__(self, *args, **kwargs):
# type: (*Any, **Any) -> Any
Expand Down Expand Up @@ -400,6 +404,13 @@ def _record_lost_event(
record_lost_func=_record_lost_event,
)

self._span_batcher = None
if self.options["_experiments"].get("trace_lifecycle", None) == "stream":
self._span_batcher = SpanBatcher(
capture_func=_capture_envelope,
record_lost_func=_record_lost_event,
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
raise ValueError(
Expand Down Expand Up @@ -929,8 +940,28 @@ def capture_event(

return return_value

def _capture_span(self, span):
# type: (Span) -> None
# Used for span streaming (trace_lifecycle == "stream").
if not has_span_streaming_enabled(self.options):
return

attributes = get_default_attributes()
span._attributes = attributes | span._attributes

segment = span.containing_transaction
span._attributes["sentry.segment.id"] = segment.span_id
span._attributes["sentry.segment.name"] = segment.name

if self._span_batcher:
logger.debug(
f"[Tracing] Adding span {span.span_id} of segment {segment.span_id} to batcher"
)
self._span_batcher.add(span)

def _capture_log(self, log):
# type: (Optional[Log]) -> None
# TODO[ivana]: Use get_default_attributes here
if not has_logs_enabled(self.options) or log is None:
return

Expand Down Expand Up @@ -999,6 +1030,7 @@ def _capture_log(self, log):

def _capture_metric(self, metric):
# type: (Optional[Metric]) -> None
# TODO[ivana]: Use get_default_attributes here
if not has_metrics_enabled(self.options) or metric is None:
return

Expand Down
10 changes: 10 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ class CompressionAlgo(Enum):
from typing import Sequence
from typing import Tuple
from typing import AbstractSet
from typing import Pattern
from typing_extensions import Literal
from typing_extensions import TypedDict

from sentry_sdk._types import (
Attributes,
BreadcrumbProcessor,
ContinuousProfilerMode,
Event,
Expand All @@ -56,6 +58,7 @@ class CompressionAlgo(Enum):
Metric,
ProfilerMode,
TracesSampler,
TraceLifecycleMode,
TransactionProcessor,
)

Expand All @@ -81,6 +84,13 @@ class CompressionAlgo(Enum):
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
"enable_metrics": Optional[bool],
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
"trace_lifecycle": Optional[TraceLifecycleMode],
"ignore_spans": Optional[
list[
Union[str, Pattern],
dict[Union[Literal["name", "attributes"], Union[str, Attributes]],],
]
],
},
total=False,
)
Expand Down
Loading
Loading