Skip to content

Commit 04e0a8d

Browse files
feat(spans): Emitting outcomes when accepted_outcome_emitted is false (#109305)
Follow up to #109303 now that we know that the value arrives correctly ([see notebook](https://app.datadoghq.com/notebook/13946162/segment-outcome-emitted)). This now adds the logic for writing the outcome to the EAP `TraceItem`.
1 parent 722441c commit 04e0a8d

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

src/sentry/spans/consumers/process_segments/convert.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
from sentry_protos.snuba.v1.trace_item_pb2 import (
1010
AnyValue,
1111
ArrayValue,
12+
CategoryCount,
1213
KeyValue,
1314
KeyValueList,
15+
Outcomes,
1416
TraceItem,
1517
)
1618

19+
from sentry.constants import DataCategory
1720
from sentry.spans.consumers.process_segments.types import CompatibleSpan
1821
from sentry.utils import metrics
1922
from sentry.utils.eap import hex_to_item_id
@@ -111,6 +114,17 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
111114
"spans.consumers.process_segments.outcome_emitted",
112115
tags={"already_emitted": str(span.get("accepted_outcome_emitted"))},
113116
)
117+
outcomes = None
118+
if span.get("accepted_outcome_emitted") is False:
119+
outcomes = Outcomes(
120+
key_id=int(span.get("key_id") or 0),
121+
category_count=[
122+
CategoryCount(
123+
data_category=int(DataCategory.SPAN_INDEXED),
124+
quantity=1,
125+
),
126+
],
127+
)
114128

115129
return TraceItem(
116130
organization_id=span["organization_id"],
@@ -125,6 +139,7 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
125139
retention_days=span["retention_days"],
126140
downsampled_retention_days=span.get("downsampled_retention_days", 0),
127141
received=_timestamp(span["received"]),
142+
outcomes=outcomes,
128143
)
129144

130145

tests/sentry/spans/consumers/process_segments/test_convert.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@
22
from typing import cast
33

44
import orjson
5+
import pytest
56
from google.protobuf.timestamp_pb2 import Timestamp
67
from sentry_kafka_schemas.schema_types.ingest_spans_v1 import SpanEvent
78
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
8-
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, KeyValue, KeyValueList
9+
from sentry_protos.snuba.v1.trace_item_pb2 import (
10+
AnyValue,
11+
ArrayValue,
12+
CategoryCount,
13+
KeyValue,
14+
KeyValueList,
15+
Outcomes,
16+
)
917

18+
from sentry.constants import DataCategory
1019
from sentry.spans.consumers.process_segments.convert import RENAME_ATTRIBUTES, convert_span_to_item
1120
from sentry.spans.consumers.process_segments.types import CompatibleSpan
1221

@@ -250,3 +259,58 @@ def test_convert_renamed_attribute_meta() -> None:
250259
assert item.attributes.get("sentry._meta.fields.attributes.sentry.raw_description") == AnyValue(
251260
string_value=orjson.dumps({"meta": description_meta}).decode()
252261
)
262+
263+
264+
@pytest.mark.parametrize(
265+
"key_id, expected_key_id",
266+
[(123, 123), ("123", 123), (None, 0)],
267+
ids=["int_key_id", "str_key_id", "none_key_id"],
268+
)
269+
def test_convert_outcomes_when_not_emitted(key_id, expected_key_id) -> None:
270+
message: SpanEvent = copy.deepcopy(SPAN_KAFKA_MESSAGE)
271+
message["accepted_outcome_emitted"] = False
272+
message["key_id"] = key_id
273+
274+
item = convert_span_to_item(cast(CompatibleSpan, message))
275+
276+
assert item.HasField("outcomes")
277+
assert item.outcomes == Outcomes(
278+
key_id=expected_key_id,
279+
category_count=[
280+
CategoryCount(
281+
data_category=int(DataCategory.SPAN_INDEXED),
282+
quantity=1,
283+
),
284+
],
285+
)
286+
287+
288+
def test_convert_outcomes_when_not_emitted_missing_key_id() -> None:
289+
message: SpanEvent = copy.deepcopy(SPAN_KAFKA_MESSAGE)
290+
message["accepted_outcome_emitted"] = False
291+
292+
item = convert_span_to_item(cast(CompatibleSpan, message))
293+
294+
assert item.HasField("outcomes")
295+
assert item.outcomes == Outcomes(
296+
key_id=0,
297+
category_count=[
298+
CategoryCount(
299+
data_category=int(DataCategory.SPAN_INDEXED),
300+
quantity=1,
301+
),
302+
],
303+
)
304+
305+
306+
def test_convert_outcomes_when_already_emitted() -> None:
307+
message: SpanEvent = copy.deepcopy(SPAN_KAFKA_MESSAGE)
308+
message["accepted_outcome_emitted"] = True
309+
item = convert_span_to_item(cast(CompatibleSpan, message))
310+
assert not item.HasField("outcomes")
311+
312+
313+
def test_convert_outcomes_when_field_missing() -> None:
314+
message: SpanEvent = copy.deepcopy(SPAN_KAFKA_MESSAGE)
315+
item = convert_span_to_item(cast(CompatibleSpan, message))
316+
assert not item.HasField("outcomes")

0 commit comments

Comments
 (0)