Skip to content

Commit def9c98

Browse files
jjbayerarmenzg
authored andcommitted
ref(spans): Separate span tree enrichment from shimming (#98693)
This PR contains no functional changes. Its only purpose is to clearly separate two enrichment steps in the segment consumer: 1. Enrich spans with information from the span tree (inherit from segment, exclusive time from children, etc.) 2. Shim the Span V1 format to be compatible with the events processing pipeline, for logic that is shared between the two pipelines.
1 parent 5f7c215 commit def9c98

File tree

9 files changed

+230
-150
lines changed

9 files changed

+230
-150
lines changed

src/sentry/spans/consumers/process_segments/convert.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
99
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem
1010

11-
from sentry.spans.consumers.process_segments.enrichment import Span
11+
from sentry.spans.consumers.process_segments.types import CompatibleSpan
1212

1313
I64_MAX = 2**63 - 1
1414

@@ -31,7 +31,7 @@
3131
}
3232

3333

34-
def convert_span_to_item(span: Span) -> TraceItem:
34+
def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
3535
attributes: MutableMapping[str, AnyValue] = {} # TODO
3636

3737
client_sample_rate = 1.0

src/sentry/spans/consumers/process_segments/enrichment.py

Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from collections import defaultdict
2-
from typing import Any, NotRequired
2+
from collections.abc import Sequence
3+
from typing import Any
34

45
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan
56

67
from sentry.performance_issues.types import SentryTags as PerformanceIssuesSentryTags
8+
from sentry.spans.consumers.process_segments.types import TreeSpan, get_span_op
79

810
# Keys of shared sentry attributes that are shared across all spans in a segment. This list
911
# is taken from `extract_shared_tags` in Relay.
@@ -43,27 +45,6 @@
4345
DEFAULT_SPAN_OP = "default"
4446

4547

46-
class Span(SegmentSpan, total=True):
47-
"""
48-
Enriched version of the incoming span payload that has additional attributes
49-
extracted.
50-
"""
51-
52-
# Added in enrichment
53-
exclusive_time: float
54-
exclusive_time_ms: float
55-
op: str
56-
57-
sentry_tags: dict[str, Any] # type: ignore[misc] # XXX: fix w/ TypedDict extra_items once available
58-
59-
# Added by `SpanGroupingResults.write_to_spans` in `_enrich_spans`
60-
hash: NotRequired[str]
61-
62-
63-
def _get_span_op(span: SegmentSpan | Span) -> str:
64-
return span.get("data", {}).get("sentry.op") or DEFAULT_SPAN_OP
65-
66-
6748
def _find_segment_span(spans: list[SegmentSpan]) -> SegmentSpan | None:
6849
"""
6950
Finds the segment in the span in the list that has ``is_segment`` set to
@@ -83,7 +64,9 @@ def _find_segment_span(spans: list[SegmentSpan]) -> SegmentSpan | None:
8364
return None
8465

8566

86-
class Enricher:
67+
class TreeEnricher:
68+
"""Enriches spans with information from their parent, child and sibling spans."""
69+
8770
def __init__(self, spans: list[SegmentSpan]) -> None:
8871
self._segment_span = _find_segment_span(spans)
8972

@@ -172,41 +155,28 @@ def _exclusive_time(self, span: SegmentSpan) -> float:
172155

173156
return exclusive_time_us / 1_000
174157

175-
def enrich_span(self, span: SegmentSpan) -> Span:
158+
def enrich_span(self, span: SegmentSpan) -> TreeSpan:
176159
exclusive_time = self._exclusive_time(span)
177160
data = self._data(span)
178-
sentry_tags = self._sentry_tags(data)
179161
return {
180162
**span,
181-
# Creates attributes for EAP spans that are required by logic shared with the
182-
# event pipeline.
183-
#
184-
# Spans in the transaction event protocol had a slightly different schema
185-
# compared to raw spans on the EAP topic. This function adds the missing
186-
# attributes to the spans to make them compatible with the event pipeline
187-
# logic.
188163
"data": data,
189-
"sentry_tags": sentry_tags,
190-
"op": _get_span_op(span),
191-
# Note: Event protocol spans expect `exclusive_time` while EAP expects
192-
# `exclusive_time_ms`. Both are the same value in milliseconds
193-
"exclusive_time": exclusive_time,
194164
"exclusive_time_ms": exclusive_time,
195165
}
196166

197167
@classmethod
198-
def enrich_spans(cls, spans: list[SegmentSpan]) -> tuple[Span | None, list[Span]]:
168+
def enrich_spans(cls, spans: list[SegmentSpan]) -> tuple[int | None, list[TreeSpan]]:
199169
inst = cls(spans)
200170
ret = []
201-
segment_span = None
171+
segment_idx = None
202172

203-
for span in spans:
173+
for i, span in enumerate(spans):
204174
enriched = inst.enrich_span(span)
205175
if span is inst._segment_span:
206-
segment_span = enriched
176+
segment_idx = i
207177
ret.append(enriched)
208178

209-
return segment_span, ret
179+
return segment_idx, ret
210180

211181

212182
def _get_mobile_start_type(segment: SegmentSpan) -> str | None:
@@ -226,12 +196,12 @@ def _get_mobile_start_type(segment: SegmentSpan) -> str | None:
226196

227197
def _timestamp_by_op(spans: list[SegmentSpan], op: str) -> float | None:
228198
for span in spans:
229-
if _get_span_op(span) == op:
199+
if get_span_op(span) == op:
230200
return span["end_timestamp_precise"]
231201
return None
232202

233203

234-
def _span_interval(span: SegmentSpan | Span) -> tuple[int, int]:
204+
def _span_interval(span: SegmentSpan | TreeSpan) -> tuple[int, int]:
235205
"""Get the start and end timestamps of a span in microseconds."""
236206
return _us(span["start_timestamp_precise"]), _us(span["end_timestamp_precise"])
237207

@@ -243,7 +213,7 @@ def _us(timestamp: float) -> int:
243213

244214

245215
def compute_breakdowns(
246-
spans: list[Span],
216+
spans: Sequence[SegmentSpan],
247217
breakdowns_config: dict[str, dict[str, Any]],
248218
) -> dict[str, float]:
249219
"""
@@ -269,14 +239,14 @@ def compute_breakdowns(
269239
return ret
270240

271241

272-
def _compute_span_ops(spans: list[Span], config: Any) -> dict[str, float]:
242+
def _compute_span_ops(spans: Sequence[SegmentSpan], config: Any) -> dict[str, float]:
273243
matches = config.get("matches")
274244
if not matches:
275245
return {}
276246

277247
intervals_by_op = defaultdict(list)
278248
for span in spans:
279-
op = _get_span_op(span)
249+
op = get_span_op(span)
280250
if operation_name := next(filter(lambda m: op.startswith(m), matches), None):
281251
intervals_by_op[operation_name].append(_span_interval(span))
282252

src/sentry/spans/consumers/process_segments/factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from sentry import options
1818
from sentry.conf.types.kafka_definition import Topic
1919
from sentry.spans.consumers.process_segments.convert import convert_span_to_item
20-
from sentry.spans.consumers.process_segments.enrichment import Span
2120
from sentry.spans.consumers.process_segments.message import process_segment
21+
from sentry.spans.consumers.process_segments.types import CompatibleSpan
2222
from sentry.utils.arroyo import MultiprocessingPool, run_task_with_multiprocessing
2323
from sentry.utils.arroyo_producer import get_arroyo_producer
2424
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
@@ -135,7 +135,7 @@ def _process_message(
135135
raise InvalidMessage(message.value.partition, message.value.offset)
136136

137137

138-
def _serialize_payload(span: Span, timestamp: datetime | None) -> Value[KafkaPayload]:
138+
def _serialize_payload(span: CompatibleSpan, timestamp: datetime | None) -> Value[KafkaPayload]:
139139
item = convert_span_to_item(span)
140140
return Value(
141141
KafkaPayload(

src/sentry/spans/consumers/process_segments/message.py

Lines changed: 28 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import logging
22
import types
33
import uuid
4-
from copy import deepcopy
5-
from typing import Any, cast
4+
from collections.abc import Sequence
5+
from typing import cast
66

77
from django.core.exceptions import ValidationError
88
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan
@@ -26,7 +26,9 @@
2626
from sentry.receivers.features import record_generic_event_processed
2727
from sentry.receivers.onboarding import record_release_received
2828
from sentry.signals import first_insight_span_received, first_transaction_received
29-
from sentry.spans.consumers.process_segments.enrichment import Enricher, Span, compute_breakdowns
29+
from sentry.spans.consumers.process_segments.enrichment import TreeEnricher, compute_breakdowns
30+
from sentry.spans.consumers.process_segments.shim import build_shim_event_data, make_compatible
31+
from sentry.spans.consumers.process_segments.types import CompatibleSpan
3032
from sentry.spans.grouping.api import load_span_grouping_config
3133
from sentry.utils import metrics
3234
from sentry.utils.dates import to_datetime
@@ -39,7 +41,9 @@
3941

4042

4143
@metrics.wraps("spans.consumers.process_segments.process_segment")
42-
def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = False) -> list[Span]:
44+
def process_segment(
45+
unprocessed_spans: list[SegmentSpan], skip_produce: bool = False
46+
) -> list[CompatibleSpan]:
4347
segment_span, spans = _enrich_spans(unprocessed_spans)
4448
if segment_span is None:
4549
return spans
@@ -69,7 +73,9 @@ def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = F
6973

7074

7175
@metrics.wraps("spans.consumers.process_segments.enrich_spans")
72-
def _enrich_spans(unprocessed_spans: list[SegmentSpan]) -> tuple[Span | None, list[Span]]:
76+
def _enrich_spans(
77+
unprocessed_spans: list[SegmentSpan],
78+
) -> tuple[CompatibleSpan | None, list[CompatibleSpan]]:
7379
"""
7480
Enriches all spans with data derived from the span tree and the segment.
7581
@@ -80,7 +86,11 @@ def _enrich_spans(unprocessed_spans: list[SegmentSpan]) -> tuple[Span | None, li
8086
Returns the segment span, if any, and the list of enriched spans.
8187
"""
8288

83-
segment, spans = Enricher.enrich_spans(unprocessed_spans)
89+
segment_idx, tree_spans = TreeEnricher.enrich_spans(unprocessed_spans)
90+
91+
# Set attributes that are needed by logic shared with the event processing pipeline
92+
spans = [make_compatible(span) for span in tree_spans]
93+
segment = spans[segment_idx] if segment_idx is not None else None
8494

8595
# Calculate grouping hashes for performance issue detection
8696
config = load_span_grouping_config()
@@ -91,14 +101,16 @@ def _enrich_spans(unprocessed_spans: list[SegmentSpan]) -> tuple[Span | None, li
91101

92102

93103
@metrics.wraps("spans.consumers.process_segments.compute_breakdowns")
94-
def _compute_breakdowns(segment: Span, spans: list[Span], project: Project) -> None:
104+
def _compute_breakdowns(
105+
segment: CompatibleSpan, spans: Sequence[CompatibleSpan], project: Project
106+
) -> None:
95107
config = project.get_option("sentry:breakdowns")
96108
breakdowns = compute_breakdowns(spans, config)
97109
segment.setdefault("data", {}).update(breakdowns)
98110

99111

100112
@metrics.wraps("spans.consumers.process_segments.create_models")
101-
def _create_models(segment: Span, project: Project) -> None:
113+
def _create_models(segment: CompatibleSpan, project: Project) -> None:
102114
"""
103115
Creates the Environment and Release models, along with the necessary
104116
relationships between them and the Project model.
@@ -144,11 +156,13 @@ def _create_models(segment: Span, project: Project) -> None:
144156

145157

146158
@metrics.wraps("spans.consumers.process_segments.detect_performance_problems")
147-
def _detect_performance_problems(segment_span: Span, spans: list[Span], project: Project) -> None:
159+
def _detect_performance_problems(
160+
segment_span: CompatibleSpan, spans: list[CompatibleSpan], project: Project
161+
) -> None:
148162
if not options.get("spans.process-segments.detect-performance-problems.enable"):
149163
return
150164

151-
event_data = _build_shim_event_data(segment_span, spans)
165+
event_data = build_shim_event_data(segment_span, spans)
152166
performance_problems = detect_performance_problems(event_data, project, standalone=True)
153167

154168
if not segment_span.get("_performance_issues_spans"):
@@ -191,55 +205,10 @@ def _detect_performance_problems(segment_span: Span, spans: list[Span], project:
191205
)
192206

193207

194-
def _build_shim_event_data(segment_span: Span, spans: list[Span]) -> dict[str, Any]:
195-
data = segment_span.get("data", {})
196-
197-
event: dict[str, Any] = {
198-
"type": "transaction",
199-
"level": "info",
200-
"contexts": {
201-
"trace": {
202-
"trace_id": segment_span["trace_id"],
203-
"type": "trace",
204-
"op": data.get("sentry.transaction.op"),
205-
"span_id": segment_span["span_id"],
206-
"hash": segment_span["hash"],
207-
},
208-
},
209-
"event_id": uuid.uuid4().hex,
210-
"project_id": segment_span["project_id"],
211-
"transaction": data.get("sentry.transaction"),
212-
"release": data.get("sentry.release"),
213-
"dist": data.get("sentry.dist"),
214-
"environment": data.get("sentry.environment"),
215-
"platform": data.get("sentry.platform"),
216-
"tags": [["environment", data.get("sentry.environment")]],
217-
"received": segment_span["received"],
218-
"timestamp": segment_span["end_timestamp_precise"],
219-
"start_timestamp": segment_span["start_timestamp_precise"],
220-
"datetime": to_datetime(segment_span["end_timestamp_precise"]).strftime(
221-
"%Y-%m-%dT%H:%M:%SZ"
222-
),
223-
"spans": [],
224-
}
225-
226-
if (profile_id := segment_span.get("profile_id")) is not None:
227-
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"}
228-
229-
# Add legacy span attributes required only by issue detectors. As opposed to
230-
# real event payloads, this also adds the segment span so detectors can run
231-
# topological sorting on the span tree.
232-
for span in spans:
233-
event_span = cast(dict[str, Any], deepcopy(span))
234-
event_span["start_timestamp"] = span["start_timestamp_precise"]
235-
event_span["timestamp"] = span["end_timestamp_precise"]
236-
event["spans"].append(event_span)
237-
238-
return event
239-
240-
241208
@metrics.wraps("spans.consumers.process_segments.record_signals")
242-
def _record_signals(segment_span: Span, spans: list[Span], project: Project) -> None:
209+
def _record_signals(
210+
segment_span: CompatibleSpan, spans: list[CompatibleSpan], project: Project
211+
) -> None:
243212
data = segment_span.get("data", {})
244213

245214
record_generic_event_processed(
@@ -271,7 +240,7 @@ def _record_signals(segment_span: Span, spans: list[Span], project: Project) ->
271240

272241

273242
@metrics.wraps("spans.consumers.process_segments.record_outcomes")
274-
def _track_outcomes(segment_span: Span, spans: list[Span]) -> None:
243+
def _track_outcomes(segment_span: CompatibleSpan, spans: list[CompatibleSpan]) -> None:
275244
if options.get("spans.process-segments.outcome-aggregator.enable"):
276245
outcome_aggregator.track_outcome_aggregated(
277246
org_id=segment_span["organization_id"],

0 commit comments

Comments
 (0)