Skip to content

Commit af61b87

Browse files
jjbayerloewenheim
authored andcommitted
ref(spans): Separate span tree enrichment from shimming (#98786)
Same as #98693 (which was reverted), but with a fix to prevent invalid `dict` creation. --------- Co-authored-by: Sebastian Zivota <[email protected]>
1 parent 7d3cedf commit af61b87

File tree

10 files changed

+254
-150
lines changed

10 files changed

+254
-150
lines changed

src/sentry/spans/consumers/process_segments/convert.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
99
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem
1010

11-
from sentry.spans.consumers.process_segments.enrichment import Span
11+
from sentry.spans.consumers.process_segments.types import CompatibleSpan
1212

1313
I64_MAX = 2**63 - 1
1414

@@ -31,7 +31,7 @@
3131
}
3232

3333

34-
def convert_span_to_item(span: Span) -> TraceItem:
34+
def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
3535
attributes: MutableMapping[str, AnyValue] = {} # TODO
3636

3737
client_sample_rate = 1.0

src/sentry/spans/consumers/process_segments/enrichment.py

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from collections import defaultdict
2-
from typing import Any, NotRequired
2+
from collections.abc import Sequence
3+
from typing import Any
34

45
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan
56

67
from sentry.performance_issues.types import SentryTags as PerformanceIssuesSentryTags
8+
from sentry.spans.consumers.process_segments.types import EnrichedSpan, get_span_op
79

810
# Keys of shared sentry attributes that are shared across all spans in a segment. This list
911
# is taken from `extract_shared_tags` in Relay.
@@ -43,27 +45,6 @@
4345
DEFAULT_SPAN_OP = "default"
4446

4547

46-
class Span(SegmentSpan, total=True):
47-
"""
48-
Enriched version of the incoming span payload that has additional attributes
49-
extracted.
50-
"""
51-
52-
# Added in enrichment
53-
exclusive_time: float
54-
exclusive_time_ms: float
55-
op: str
56-
57-
sentry_tags: dict[str, Any] # type: ignore[misc] # XXX: fix w/ TypedDict extra_items once available
58-
59-
# Added by `SpanGroupingResults.write_to_spans` in `_enrich_spans`
60-
hash: NotRequired[str]
61-
62-
63-
def _get_span_op(span: SegmentSpan | Span) -> str:
64-
return span.get("data", {}).get("sentry.op") or DEFAULT_SPAN_OP
65-
66-
6748
def _find_segment_span(spans: list[SegmentSpan]) -> SegmentSpan | None:
6849
"""
6950
Finds the segment in the span in the list that has ``is_segment`` set to
@@ -83,7 +64,9 @@ def _find_segment_span(spans: list[SegmentSpan]) -> SegmentSpan | None:
8364
return None
8465

8566

86-
class Enricher:
67+
class TreeEnricher:
68+
"""Enriches spans with information from their parent, child and sibling spans."""
69+
8770
def __init__(self, spans: list[SegmentSpan]) -> None:
8871
self._segment_span = _find_segment_span(spans)
8972

@@ -172,41 +155,28 @@ def _exclusive_time(self, span: SegmentSpan) -> float:
172155

173156
return exclusive_time_us / 1_000
174157

175-
def enrich_span(self, span: SegmentSpan) -> Span:
158+
def enrich_span(self, span: SegmentSpan) -> EnrichedSpan:
176159
exclusive_time = self._exclusive_time(span)
177160
data = self._data(span)
178-
sentry_tags = self._sentry_tags(data)
179161
return {
180162
**span,
181-
# Creates attributes for EAP spans that are required by logic shared with the
182-
# event pipeline.
183-
#
184-
# Spans in the transaction event protocol had a slightly different schema
185-
# compared to raw spans on the EAP topic. This function adds the missing
186-
# attributes to the spans to make them compatible with the event pipeline
187-
# logic.
188163
"data": data,
189-
"sentry_tags": sentry_tags,
190-
"op": _get_span_op(span),
191-
# Note: Event protocol spans expect `exclusive_time` while EAP expects
192-
# `exclusive_time_ms`. Both are the same value in milliseconds
193-
"exclusive_time": exclusive_time,
194164
"exclusive_time_ms": exclusive_time,
195165
}
196166

197167
@classmethod
198-
def enrich_spans(cls, spans: list[SegmentSpan]) -> tuple[Span | None, list[Span]]:
168+
def enrich_spans(cls, spans: list[SegmentSpan]) -> tuple[int | None, list[EnrichedSpan]]:
199169
inst = cls(spans)
200170
ret = []
201-
segment_span = None
171+
segment_idx = None
202172

203-
for span in spans:
173+
for i, span in enumerate(spans):
204174
enriched = inst.enrich_span(span)
205175
if span is inst._segment_span:
206-
segment_span = enriched
176+
segment_idx = i
207177
ret.append(enriched)
208178

209-
return segment_span, ret
179+
return segment_idx, ret
210180

211181

212182
def _get_mobile_start_type(segment: SegmentSpan) -> str | None:
@@ -226,12 +196,12 @@ def _get_mobile_start_type(segment: SegmentSpan) -> str | None:
226196

227197
def _timestamp_by_op(spans: list[SegmentSpan], op: str) -> float | None:
228198
for span in spans:
229-
if _get_span_op(span) == op:
199+
if get_span_op(span) == op:
230200
return span["end_timestamp_precise"]
231201
return None
232202

233203

234-
def _span_interval(span: SegmentSpan | Span) -> tuple[int, int]:
204+
def _span_interval(span: SegmentSpan | EnrichedSpan) -> tuple[int, int]:
235205
"""Get the start and end timestamps of a span in microseconds."""
236206
return _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"])
237207

@@ -243,7 +213,7 @@ def _us(timestamp: float) -> int:
243213

244214

245215
def compute_breakdowns(
246-
spans: list[Span],
216+
spans: Sequence[SegmentSpan],
247217
breakdowns_config: dict[str, dict[str, Any]],
248218
) -> dict[str, float]:
249219
"""
@@ -269,14 +239,14 @@ def compute_breakdowns(
269239
return ret
270240

271241

272-
def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, float]:
242+
def _compute_span_ops(spans: Sequence[SegmentSpan], config: Any) -> dict[str, float]:
273243
matches = config.get("matches")
274244
if not matches:
275245
return {}
276246

277247
intervals_by_op = defaultdict(list)
278248
for span in spans:
279-
op = _get_span_op(span)
249+
op = get_span_op(span)
280250
if operation_name := next(filter(lambda m: op.startswith(m), matches), None):
281251
intervals_by_op[operation_name].append(_span_interval(span))
282252

src/sentry/spans/consumers/process_segments/factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from sentry import options
1818
from sentry.conf.types.kafka_definition import Topic
1919
from sentry.spans.consumers.process_segments.convert import convert_span_to_item
20-
from sentry.spans.consumers.process_segments.enrichment import Span
2120
from sentry.spans.consumers.process_segments.message import process_segment
21+
from sentry.spans.consumers.process_segments.types import CompatibleSpan
2222
from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
2323
from sentry.utils.arroyo_producer import get_arroyo_producer
2424
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
@@ -135,7 +135,7 @@ def _process_message(
135135
raise InvalidMessage(message.value.partition, message.value.offset)
136136

137137

138-
def _serialize_payload(span: Span, timestamp: datetime | None) -> Value[KafkaPayload]:
138+
def _serialize_payload(span: CompatibleSpan, timestamp: datetime | None) -> Value[KafkaPayload]:
139139
item = convert_span_to_item(span)
140140
return Value(
141141
KafkaPayload(

src/sentry/spans/consumers/process_segments/message.py

Lines changed: 28 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import logging
22
import types
33
import uuid
4-
from copy import deepcopy
5-
from typing import Any, cast
4+
from collections.abc import Sequence
5+
from typing import cast
66

77
from django.core.exceptions import ValidationError
88
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan
@@ -26,7 +26,9 @@
2626
from sentry.receivers.features import record_generic_event_processed
2727
from sentry.receivers.onboarding import record_release_received
2828
from sentry.signals import first_insight_span_received, first_transaction_received
29-
from sentry.spans.consumers.process_segments.enrichment import Enricher, Span, compute_breakdowns
29+
from sentry.spans.consumers.process_segments.enrichment import TreeEnricher, compute_breakdowns
30+
from sentry.spans.consumers.process_segments.shim import build_shim_event_data, make_compatible
31+
from sentry.spans.consumers.process_segments.types import CompatibleSpan
3032
from sentry.spans.grouping.api import load_span_grouping_config
3133
from sentry.utils import metrics
3234
from sentry.utils.dates import to_datetime
@@ -39,7 +41,9 @@
3941

4042

4143
@metrics.wraps("spans.consumers.process_segments.process_segment")
42-
def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = False) -> list[Span]:
44+
def process_segment(
45+
unprocessed_spans: list[SegmentSpan], skip_produce: bool = False
46+
) -> list[CompatibleSpan]:
4347
segment_span, spans = _enrich_spans(unprocessed_spans)
4448
if segment_span is None:
4549
return spans
@@ -69,7 +73,9 @@ def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = F
6973

7074

7175
@metrics.wraps("spans.consumers.process_segments.enrich_spans")
72-
def _enrich_spans(unprocessed_spans: list[SegmentSpan]) -> tuple[Span | None, list[Span]]:
76+
def _enrich_spans(
77+
unprocessed_spans: list[SegmentSpan],
78+
) -> tuple[CompatibleSpan | None, list[CompatibleSpan]]:
7379
"""
7480
Enriches all spans with data derived from the span tree and the segment.
7581
@@ -80,7 +86,11 @@ def _enrich_spans(unprocessed_spans: list[SegmentSpan]) -> tuple[Span | None, li
8086
Returns the segment span, if any, and the list of enriched spans.
8187
"""
8288

83-
segment, spans = Enricher.enrich_spans(unprocessed_spans)
89+
segment_idx, tree_spans = TreeEnricher.enrich_spans(unprocessed_spans)
90+
91+
# Set attributes that are needed by logic shared with the event processing pipeline.
92+
spans = [make_compatible(span) for span in tree_spans]
93+
segment = spans[segment_idx] if segment_idx is not None else None
8494

8595
# Calculate grouping hashes for performance issue detection
8696
config = load_span_grouping_config()
@@ -91,14 +101,16 @@ def _enrich_spans(unprocessed_spans: list[SegmentSpan]) -> tuple[Span | None, li
91101

92102

93103
@metrics.wraps("spans.consumers.process_segments.compute_breakdowns")
94-
def _compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> None:
104+
def _compute_breakdowns(
105+
segment: CompatibleSpan, spans: Sequence[CompatibleSpan], project: Project
106+
) -> None:
95107
config = project.get_option("sentry:breakdowns")
96108
breakdowns = compute_breakdowns(spans, config)
97109
segment.setdefault("data", {}).update(breakdowns)
98110

99111

100112
@metrics.wraps("spans.consumers.process_segments.create_models")
101-
def _create_models(segment: Span, project: Project) -> None:
113+
def _create_models(segment: CompatibleSpan, project: Project) -> None:
102114
"""
103115
Creates the Environment and Release models, along with the necessary
104116
relationships between them and the Project model.
@@ -144,11 +156,13 @@ def _create_models(segment: Span, project: Project) -> None:
144156

145157

146158
@metrics.wraps("spans.consumers.process_segments.detect_performance_problems")
147-
def _detect_performance_problems(segment_span: Span, spans: list[Span], project: Project) -> None:
159+
def _detect_performance_problems(
160+
segment_span: CompatibleSpan, spans: list[CompatibleSpan], project: Project
161+
) -> None:
148162
if not options.get("spans.process-segments.detect-performance-problems.enable"):
149163
return
150164

151-
event_data = _build_shim_event_data(segment_span, spans)
165+
event_data = build_shim_event_data(segment_span, spans)
152166
performance_problems = detect_performance_problems(event_data, project, standalone=True)
153167

154168
if not segment_span.get("_performance_issues_spans"):
@@ -191,55 +205,10 @@ def _detect_performance_problems(segment_span: Span, spans: list[Span], project:
191205
)
192206

193207

194-
def _build_shim_event_data(segment_span: Span, spans: list[Span]) -> dict[str, Any]:
195-
data = segment_span.get("data", {})
196-
197-
event: dict[str, Any] = {
198-
"type": "transaction",
199-
"level": "info",
200-
"contexts": {
201-
"trace": {
202-
"trace_id": segment_span["trace_id"],
203-
"type": "trace",
204-
"op": data.get("sentry.transaction.op"),
205-
"span_id": segment_span["span_id"],
206-
"hash": segment_span["hash"],
207-
},
208-
},
209-
"event_id": uuid.uuid4().hex,
210-
"project_id": segment_span["project_id"],
211-
"transaction": data.get("sentry.transaction"),
212-
"release": data.get("sentry.release"),
213-
"dist": data.get("sentry.dist"),
214-
"environment": data.get("sentry.environment"),
215-
"platform": data.get("sentry.platform"),
216-
"tags": [["environment", data.get("sentry.environment")]],
217-
"received": segment_span["received"],
218-
"timestamp": segment_span["end_timestamp_precise"],
219-
"start_timestamp": segment_span["start_timestamp_precise"],
220-
"datetime": to_datetime(segment_span["end_timestamp_precise"]).strftime(
221-
"%Y-%m-%dT%H:%M:%SZ"
222-
),
223-
"spans": [],
224-
}
225-
226-
if (profile_id := segment_span.get("profile_id")) is not None:
227-
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"}
228-
229-
# Add legacy span attributes required only by issue detectors. As opposed to
230-
# real event payloads, this also adds the segment span so detectors can run
231-
# topological sorting on the span tree.
232-
for span in spans:
233-
event_span = cast(dict[str, Any], deepcopy(span))
234-
event_span["start_timestamp"] = span["start_timestamp_precise"]
235-
event_span["timestamp"] = span["end_timestamp_precise"]
236-
event["spans"].append(event_span)
237-
238-
return event
239-
240-
241208
@metrics.wraps("spans.consumers.process_segments.record_signals")
242-
def _record_signals(segment_span: Span, spans: list[Span], project: Project) -> None:
209+
def _record_signals(
210+
segment_span: CompatibleSpan, spans: list[CompatibleSpan], project: Project
211+
) -> None:
243212
data = segment_span.get("data", {})
244213

245214
record_generic_event_processed(
@@ -271,7 +240,7 @@ def _record_signals(segment_span: Span, spans: list[Span], project: Project) ->
271240

272241

273242
@metrics.wraps("spans.consumers.process_segments.record_outcomes")
274-
def _track_outcomes(segment_span: Span, spans: list[Span]) -> None:
243+
def _track_outcomes(segment_span: CompatibleSpan, spans: list[CompatibleSpan]) -> None:
275244
if options.get("spans.process-segments.outcome-aggregator.enable"):
276245
outcome_aggregator.track_outcome_aggregated(
277246
org_id=segment_span["organization_id"],

0 commit comments

Comments
 (0)