Skip to content

Commit 7461867

Browse files
authored
feat(spans): Evict spans during insert (#92393)
A proof of concept that limits the number of spans per segment during insertion. Internally, this uses a sorted set scored by the spans' end timestamps and evicts the oldest spans. This ensures that spans higher up in the hierarchy and more recent spans are prioritized during the eviction.
1 parent 28e3db2 commit 7461867

File tree

6 files changed

+55
-19
lines changed

6 files changed

+55
-19
lines changed

src/sentry/scripts/spans/add-buffer.lua

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ local main_redirect_key = string.format("span-buf:sr:{%s}", project_and_trace)
2727
local set_span_id = parent_span_id
2828
local redirect_depth = 0
2929

30-
for i = 0, 10000 do -- theoretically this limit means that segment trees of depth 10k may not be joined together correctly.
30+
for i = 0, 1000 do
3131
local new_set_span = redis.call("hget", main_redirect_key, set_span_id)
3232
redirect_depth = i
3333
if not new_set_span or new_set_span == set_span_id then
@@ -40,19 +40,29 @@ end
4040
redis.call("hset", main_redirect_key, span_id, set_span_id)
4141
redis.call("expire", main_redirect_key, set_timeout)
4242

43+
local span_count = 0
44+
4345
local set_key = string.format("span-buf:s:{%s}:%s", project_and_trace, set_span_id)
44-
if not is_root_span and redis.call("scard", span_key) > 0 then
45-
redis.call("sunionstore", set_key, set_key, span_key)
46+
if not is_root_span and redis.call("zcard", span_key) > 0 then
47+
span_count = redis.call("zunionstore", set_key, 2, set_key, span_key)
4648
redis.call("unlink", span_key)
4749
end
4850

4951
local parent_key = string.format("span-buf:s:{%s}:%s", project_and_trace, parent_span_id)
50-
if set_span_id ~= parent_span_id and redis.call("scard", parent_key) > 0 then
51-
redis.call("sunionstore", set_key, set_key, parent_key)
52+
if set_span_id ~= parent_span_id and redis.call("zcard", parent_key) > 0 then
53+
span_count = redis.call("zunionstore", set_key, 2, set_key, parent_key)
5254
redis.call("unlink", parent_key)
5355
end
5456
redis.call("expire", set_key, set_timeout)
5557

58+
if span_count == 0 then
59+
span_count = redis.call("zcard", set_key)
60+
end
61+
62+
if span_count > 1000 then
63+
redis.call("zpopmin", set_key, span_count - 1000)
64+
end
65+
5666
local has_root_span_key = string.format("span-buf:hrs:%s", set_key)
5767
local has_root_span = redis.call("get", has_root_span_key) == "1" or is_root_span
5868
if has_root_span then

src/sentry/spans/buffer.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class Span(NamedTuple):
116116
parent_span_id: str | None
117117
project_id: int
118118
payload: bytes
119+
end_timestamp_precise: float
119120
is_segment_span: bool = False
120121

121122
def effective_parent_id(self):
@@ -193,7 +194,9 @@ def process_spans(self, spans: Sequence[Span], now: int):
193194
with self.client.pipeline(transaction=False) as p:
194195
for (project_and_trace, parent_span_id), subsegment in trees.items():
195196
set_key = f"span-buf:s:{{{project_and_trace}}}:{parent_span_id}"
196-
p.sadd(set_key, *[span.payload for span in subsegment])
197+
p.zadd(
198+
set_key, {span.payload: span.end_timestamp_precise for span in subsegment}
199+
)
197200

198201
p.execute()
199202

@@ -428,13 +431,13 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey,
428431
with self.client.pipeline(transaction=False) as p:
429432
current_keys = []
430433
for key, cursor in cursors.items():
431-
p.sscan(key, cursor=cursor, count=self.segment_page_size)
434+
p.zscan(key, cursor=cursor, count=self.segment_page_size)
432435
current_keys.append(key)
433436

434437
results = p.execute()
435438

436-
for key, (cursor, spans) in zip(current_keys, results):
437-
sizes[key] += sum(len(span) for span in spans)
439+
for key, (cursor, zscan_values) in zip(current_keys, results):
440+
sizes[key] += sum(len(span) for span, _ in zscan_values)
438441
if sizes[key] > self.max_segment_bytes:
439442
metrics.incr("spans.buffer.flush_segments.segment_size_exceeded")
440443
logger.error("Skipping too large segment, byte size %s", sizes[key])
@@ -443,15 +446,7 @@ def _load_segment_data(self, segment_keys: list[SegmentKey]) -> dict[SegmentKey,
443446
del cursors[key]
444447
continue
445448

446-
payloads[key].extend(spans)
447-
if len(payloads[key]) > self.max_segment_spans:
448-
metrics.incr("spans.buffer.flush_segments.segment_span_count_exceeded")
449-
logger.error("Skipping too large segment, span count %s", len(payloads[key]))
450-
451-
del payloads[key]
452-
del cursors[key]
453-
continue
454-
449+
payloads[key].extend(span for span, _ in zscan_values)
455450
if cursor == 0:
456451
del cursors[key]
457452
else:

src/sentry/spans/consumers/process/factory.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import time
33
from collections.abc import Callable, Mapping
44
from functools import partial
5+
from typing import cast
56

67
import rapidjson
78
from arroyo.backends.kafka.consumer import KafkaPayload
@@ -10,6 +11,7 @@
1011
from arroyo.processing.strategies.commit import CommitOffsets
1112
from arroyo.processing.strategies.run_task import RunTask
1213
from arroyo.types import Commit, FilteredPayload, Message, Partition
14+
from sentry_kafka_schemas.schema_types.ingest_spans_v1 import SpanEvent
1315

1416
from sentry.spans.buffer import Span, SpansBuffer
1517
from sentry.spans.consumers.process.flusher import SpanFlusher
@@ -129,13 +131,14 @@ def process_batch(
129131
if min_timestamp is None or timestamp < min_timestamp:
130132
min_timestamp = timestamp
131133

132-
val = rapidjson.loads(payload.value)
134+
val = cast(SpanEvent, rapidjson.loads(payload.value))
133135
span = Span(
134136
trace_id=val["trace_id"],
135137
span_id=val["span_id"],
136138
parent_span_id=val.get("parent_span_id"),
137139
project_id=val["project_id"],
138140
payload=payload.value,
141+
end_timestamp_precise=val["end_timestamp_precise"],
139142
is_segment_span=bool(val.get("parent_span_id") is None or val.get("is_remote")),
140143
)
141144
spans.append(span)

tests/sentry/spans/consumers/process/test_consumer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def add_commit(offsets, force=False):
4141
"project_id": 12,
4242
"span_id": "a" * 16,
4343
"trace_id": "b" * 32,
44+
"end_timestamp_precise": 1700000000.0,
4445
}
4546
).encode("ascii"),
4647
[],
@@ -69,6 +70,7 @@ def add_commit(offsets, force=False):
6970
"segment_id": "aaaaaaaaaaaaaaaa",
7071
"span_id": "aaaaaaaaaaaaaaaa",
7172
"trace_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
73+
"end_timestamp_precise": 1700000000.0,
7274
},
7375
],
7476
}

tests/sentry/spans/consumers/process/test_flusher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,23 @@ def append(msg):
4444
span_id="a" * 16,
4545
parent_span_id="b" * 16,
4646
project_id=1,
47+
end_timestamp_precise=now,
4748
),
4849
Span(
4950
payload=_payload(b"d" * 16),
5051
trace_id=trace_id,
5152
span_id="d" * 16,
5253
parent_span_id="b" * 16,
5354
project_id=1,
55+
end_timestamp_precise=now,
5456
),
5557
Span(
5658
payload=_payload(b"c" * 16),
5759
trace_id=trace_id,
5860
span_id="c" * 16,
5961
parent_span_id="b" * 16,
6062
project_id=1,
63+
end_timestamp_precise=now,
6164
),
6265
Span(
6366
payload=_payload(b"b" * 16),
@@ -66,6 +69,7 @@ def append(msg):
6669
parent_span_id=None,
6770
is_segment_span=True,
6871
project_id=1,
72+
end_timestamp_precise=now,
6973
),
7074
]
7175

tests/sentry/spans/test_buffer.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,23 @@ def process_spans(spans: Sequence[Span | _SplitBatch], buffer: SpansBuffer, now)
123123
span_id="a" * 16,
124124
parent_span_id="b" * 16,
125125
project_id=1,
126+
end_timestamp_precise=1700000000.0,
126127
),
127128
Span(
128129
payload=_payload(b"d" * 16),
129130
trace_id="a" * 32,
130131
span_id="d" * 16,
131132
parent_span_id="b" * 16,
132133
project_id=1,
134+
end_timestamp_precise=1700000000.0,
133135
),
134136
Span(
135137
payload=_payload(b"c" * 16),
136138
trace_id="a" * 32,
137139
span_id="c" * 16,
138140
parent_span_id="b" * 16,
139141
project_id=1,
142+
end_timestamp_precise=1700000000.0,
140143
),
141144
Span(
142145
payload=_payload(b"b" * 16),
@@ -145,6 +148,7 @@ def process_spans(spans: Sequence[Span | _SplitBatch], buffer: SpansBuffer, now)
145148
parent_span_id=None,
146149
is_segment_span=True,
147150
project_id=1,
151+
end_timestamp_precise=1700000000.0,
148152
),
149153
]
150154
)
@@ -188,6 +192,7 @@ def test_basic(buffer: SpansBuffer, spans):
188192
span_id="d" * 16,
189193
parent_span_id="b" * 16,
190194
project_id=1,
195+
end_timestamp_precise=1700000000.0,
191196
),
192197
_SplitBatch(),
193198
Span(
@@ -196,6 +201,7 @@ def test_basic(buffer: SpansBuffer, spans):
196201
span_id="b" * 16,
197202
parent_span_id="a" * 16,
198203
project_id=1,
204+
end_timestamp_precise=1700000000.0,
199205
),
200206
Span(
201207
payload=_payload(b"a" * 16),
@@ -204,13 +210,15 @@ def test_basic(buffer: SpansBuffer, spans):
204210
parent_span_id=None,
205211
is_segment_span=True,
206212
project_id=1,
213+
end_timestamp_precise=1700000000.0,
207214
),
208215
Span(
209216
payload=_payload(b"c" * 16),
210217
trace_id="a" * 32,
211218
span_id="c" * 16,
212219
parent_span_id="a" * 16,
213220
project_id=1,
221+
end_timestamp_precise=1700000000.0,
214222
),
215223
]
216224
)
@@ -254,27 +262,31 @@ def test_deep(buffer: SpansBuffer, spans):
254262
span_id="e" * 16,
255263
parent_span_id="d" * 16,
256264
project_id=1,
265+
end_timestamp_precise=1700000000.0,
257266
),
258267
Span(
259268
payload=_payload(b"d" * 16),
260269
trace_id="a" * 32,
261270
span_id="d" * 16,
262271
parent_span_id="b" * 16,
263272
project_id=1,
273+
end_timestamp_precise=1700000000.0,
264274
),
265275
Span(
266276
payload=_payload(b"b" * 16),
267277
trace_id="a" * 32,
268278
span_id="b" * 16,
269279
parent_span_id="c" * 16,
270280
project_id=1,
281+
end_timestamp_precise=1700000000.0,
271282
),
272283
Span(
273284
payload=_payload(b"c" * 16),
274285
trace_id="a" * 32,
275286
span_id="c" * 16,
276287
parent_span_id="a" * 16,
277288
project_id=1,
289+
end_timestamp_precise=1700000000.0,
278290
),
279291
Span(
280292
payload=_payload(b"a" * 16),
@@ -283,6 +295,7 @@ def test_deep(buffer: SpansBuffer, spans):
283295
parent_span_id=None,
284296
is_segment_span=True,
285297
project_id=1,
298+
end_timestamp_precise=1700000000.0,
286299
),
287300
]
288301
)
@@ -327,20 +340,23 @@ def test_deep2(buffer: SpansBuffer, spans):
327340
span_id="c" * 16,
328341
parent_span_id="b" * 16,
329342
project_id=1,
343+
end_timestamp_precise=1700000000.0,
330344
),
331345
Span(
332346
payload=_payload(b"d" * 16),
333347
trace_id="a" * 32,
334348
span_id="d" * 16,
335349
parent_span_id="b" * 16,
336350
project_id=1,
351+
end_timestamp_precise=1700000000.0,
337352
),
338353
Span(
339354
payload=_payload(b"e" * 16),
340355
trace_id="a" * 32,
341356
span_id="e" * 16,
342357
parent_span_id="b" * 16,
343358
project_id=1,
359+
end_timestamp_precise=1700000000.0,
344360
),
345361
Span(
346362
payload=_payload(b"b" * 16),
@@ -349,6 +365,7 @@ def test_deep2(buffer: SpansBuffer, spans):
349365
parent_span_id=None,
350366
is_segment_span=True,
351367
project_id=2,
368+
end_timestamp_precise=1700000000.0,
352369
),
353370
]
354371
)
@@ -400,20 +417,23 @@ def test_parent_in_other_project(buffer: SpansBuffer, spans):
400417
parent_span_id="d" * 16,
401418
project_id=1,
402419
is_segment_span=True,
420+
end_timestamp_precise=1700000000.0,
403421
),
404422
Span(
405423
payload=_payload(b"d" * 16),
406424
trace_id="a" * 32,
407425
span_id="d" * 16,
408426
parent_span_id="b" * 16,
409427
project_id=1,
428+
end_timestamp_precise=1700000000.0,
410429
),
411430
Span(
412431
payload=_payload(b"e" * 16),
413432
trace_id="a" * 32,
414433
span_id="e" * 16,
415434
parent_span_id="b" * 16,
416435
project_id=1,
436+
end_timestamp_precise=1700000000.0,
417437
),
418438
Span(
419439
payload=_payload(b"b" * 16),
@@ -422,6 +442,7 @@ def test_parent_in_other_project(buffer: SpansBuffer, spans):
422442
parent_span_id=None,
423443
is_segment_span=True,
424444
project_id=2,
445+
end_timestamp_precise=1700000000.0,
425446
),
426447
]
427448
),
@@ -476,6 +497,7 @@ def test_flush_rebalance(buffer: SpansBuffer):
476497
parent_span_id=None,
477498
project_id=1,
478499
is_segment_span=True,
500+
end_timestamp_precise=1700000000.0,
479501
)
480502
]
481503

0 commit comments

Comments
 (0)