Skip to content

Commit ba91268

Browse files
committed
track batcher file
1 parent 93c83e2 commit ba91268

File tree

1 file changed

+189
-0
lines changed

1 file changed

+189
-0
lines changed

sentry_sdk/_span_batcher.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# This file is experimental and its contents may change without notice. This is
2+
# a simple POC buffer implementation. Eventually, we should switch to a telemetry
3+
# buffer: https://develop.sentry.dev/sdk/telemetry/telemetry-buffer/
4+
5+
import os
6+
import random
7+
import threading
8+
from collections import defaultdict
9+
from datetime import datetime, timezone
10+
from typing import Optional, List, Callable, TYPE_CHECKING, Any
11+
12+
from sentry_sdk.consts import SPANSTATUS
13+
from sentry_sdk.utils import format_attribute_value, format_timestamp, safe_repr
14+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
15+
from sentry_sdk.tracing import Transaction
16+
17+
if TYPE_CHECKING:
18+
from sentry_sdk.tracing import Span
19+
from sentry_sdk._types import SpanV2
20+
21+
22+
class SpanBatcher:
23+
# TODO[span-first]: Adjust limits. However, there's still a restriction of
24+
# at most 1000 spans per envelope.
25+
MAX_SPANS_BEFORE_FLUSH = 1_000
26+
MAX_SPANS_BEFORE_DROP = 2_000
27+
FLUSH_WAIT_TIME = 5.0
28+
29+
def __init__(
30+
self,
31+
capture_func, # type: Callable[[Envelope], None]
32+
record_lost_func, # type: Callable[..., None]
33+
):
34+
# type: (...) -> None
35+
# Spans from different traces cannot be emitted in the same envelope
36+
# since the envelope contains a shared trace header. That's why we bucket
37+
# by trace_id, so that we can then send the buckets each in its own
38+
# envelope.
39+
# trace_id -> span buffer
40+
self._span_buffer = defaultdict(list) # type: dict[str, list[Span]]
41+
self._capture_func = capture_func
42+
self._record_lost_func = record_lost_func
43+
self._running = True
44+
self._lock = threading.Lock()
45+
46+
self._flush_event = threading.Event() # type: threading.Event
47+
48+
self._flusher = None # type: Optional[threading.Thread]
49+
self._flusher_pid = None # type: Optional[int]
50+
51+
def _ensure_thread(self):
52+
# type: (...) -> bool
53+
"""For forking processes we might need to restart this thread.
54+
This ensures that our process actually has that thread running.
55+
"""
56+
if not self._running:
57+
return False
58+
59+
pid = os.getpid()
60+
if self._flusher_pid == pid:
61+
return True
62+
63+
with self._lock:
64+
# Recheck to make sure another thread didn't get here and start the
65+
# the flusher in the meantime
66+
if self._flusher_pid == pid:
67+
return True
68+
69+
self._flusher_pid = pid
70+
71+
self._flusher = threading.Thread(target=self._flush_loop)
72+
self._flusher.daemon = True
73+
74+
try:
75+
self._flusher.start()
76+
except RuntimeError:
77+
# Unfortunately at this point the interpreter is in a state that no
78+
# longer allows us to spawn a thread and we have to bail.
79+
self._running = False
80+
return False
81+
82+
return True
83+
84+
def _flush_loop(self):
85+
# type: (...) -> None
86+
while self._running:
87+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
88+
self._flush_event.clear()
89+
self._flush()
90+
91+
def get_size(self):
92+
# type: () -> int
93+
# caller is responsible for locking before checking this
94+
return sum(len(buffer) for buffer in self._span_buffer.values())
95+
96+
def add(self, span):
97+
# type: (Span) -> None
98+
if not self._ensure_thread() or self._flusher is None:
99+
return None
100+
101+
with self._lock:
102+
if self.get_size() >= self.MAX_SPANS_BEFORE_DROP:
103+
self._record_lost_func(
104+
reason="queue_overflow",
105+
data_category="span",
106+
quantity=1,
107+
)
108+
return None
109+
110+
self._span_buffer[span.trace_id].append(span)
111+
if (
112+
self.get_size() >= self.MAX_SPANS_BEFORE_FLUSH
113+
): # TODO[span-first] should this be per bucket?
114+
self._flush_event.set()
115+
116+
def kill(self):
117+
# type: (...) -> None
118+
if self._flusher is None:
119+
return
120+
121+
self._running = False
122+
self._flush_event.set()
123+
self._flusher = None
124+
125+
def flush(self):
126+
# type: (...) -> None
127+
self._flush()
128+
129+
@staticmethod
130+
def _span_to_transport_format(span):
131+
# type: (Span) -> SpanV2
132+
res = {
133+
"trace_id": span.trace_id,
134+
"span_id": span.span_id,
135+
"name": span.name,
136+
"status": SPANSTATUS.OK
137+
if span.status in (SPANSTATUS.OK, SPANSTATUS.UNSET)
138+
else SPANSTATUS.ERROR,
139+
"is_segment": span.containing_transaction == span,
140+
"start_timestamp": span.start_timestamp,
141+
"end_timestamp": span.timestamp,
142+
}
143+
144+
if span.parent_span_id:
145+
res["parent_span_id"] = span.parent_span_id
146+
147+
if span["attributes"]:
148+
res["attributes"] = {
149+
k: format_attribute_value(v) for (k, v) in span["attributes"].items()
150+
}
151+
152+
return res
153+
154+
def _flush(self):
155+
# type: (...) -> Optional[Envelope]
156+
with self._lock:
157+
if len(self._span_buffer) == 0:
158+
return None
159+
160+
for trace_id, spans in self._span_buffer:
161+
envelope = Envelope(
162+
headers={
163+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
164+
}
165+
# TODO[span-first] more headers
166+
)
167+
168+
envelope.add_item(
169+
Item(
170+
type="span",
171+
content_type="application/vnd.sentry.items.span.v2+json",
172+
headers={
173+
"item_count": len(spans),
174+
},
175+
payload=PayloadRef(
176+
json={
177+
"items": [
178+
self._span_to_transport_format(span)
179+
for span in spans
180+
]
181+
}
182+
),
183+
)
184+
)
185+
186+
self._span_buffer.clear()
187+
188+
self._capture_func(envelope)
189+
return envelope

0 commit comments

Comments
 (0)