Skip to content

Commit 3cb3fa8

Browse files
authored
fix(issues): Start processing new_group_first_seen in merge case (#7351)
This is part of an effort to fix the issue-search's sort-by-age feature. As part of that feature, we are adding the "first_seen" time of a group to each error. When we merge groups, we want to update the errors to have the new group's "group_first_seen". We've decided to do that by explicitly passing the primary group's "first_seen" time along with the Kafka message. This PR starts ingesting that field properly; a followup PR in the `sentry` repo will start providing that field after this has deployed everywhere.
1 parent e808038 commit 3cb3fa8

File tree

3 files changed

+145
-2
lines changed

3 files changed

+145
-2
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ python-dateutil==2.8.2
2828
python-rapidjson==1.8
2929
redis==4.5.4
3030
sentry-arroyo==2.29.1
31-
sentry-kafka-schemas==2.0.2
31+
sentry-kafka-schemas==2.0.4
3232
sentry-protos==0.3.3
3333
sentry-redis-tools==0.5.0
3434
sentry-relay==0.9.5

snuba/replacers/errors_replacer.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ class MergeReplacement(Replacement):
626626
project_id: int
627627
previous_group_ids: Sequence[int]
628628
new_group_id: int
629+
new_group_first_seen: datetime | None
629630
timestamp: datetime
630631

631632
all_columns: Sequence[FlattenedColumn]
@@ -660,11 +661,22 @@ def parse_message(
660661
else:
661662
SEEN_MERGE_TXN_CACHE.append(txn)
662663

664+
# new_group_first_seen was added to the message schema; keep this check
665+
# for backwards compatibility.
666+
raw_new_group_first_seen = message.data.get("new_group_first_seen")
667+
if raw_new_group_first_seen:
668+
new_group_first_seen = datetime.strptime(
669+
raw_new_group_first_seen, settings.PAYLOAD_DATETIME_FORMAT
670+
)
671+
else:
672+
new_group_first_seen = None
673+
663674
return cls(
664675
project_id=project_id,
665676
previous_group_ids=previous_group_ids,
666677
new_group_id=message.data["new_group_id"],
667678
timestamp=timestamp,
679+
new_group_first_seen=new_group_first_seen,
668680
all_columns=context.all_columns,
669681
)
670682

@@ -690,9 +702,18 @@ def get_count_query(self, table_name: str) -> Optional[str]:
690702
def get_insert_query(self, table_name: str) -> Optional[str]:
691703
all_column_names = [c.escaped for c in self.all_columns]
692704
all_columns = ", ".join(all_column_names)
705+
replacement_columns = {"group_id": str(self.new_group_id)}
706+
707+
if self.new_group_first_seen is not None:
708+
group_first_seen_str = self.new_group_first_seen.strftime(DATETIME_FORMAT)
709+
replacement_columns[
710+
"group_first_seen"
711+
] = f"CAST('{group_first_seen_str}' AS DateTime)"
712+
693713
select_columns = ", ".join(
694714
map(
695-
lambda i: i if i != "group_id" else str(self.new_group_id),
715+
# Get i from replacement_columns; default to i if no replacement.
716+
lambda i: replacement_columns.get(i, i),
696717
all_column_names,
697718
)
698719
)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
from datetime import datetime, timedelta
2+
from unittest import mock
3+
4+
from sentry_kafka_schemas.schema_types.events_v1 import EndMergeMessageBody
5+
6+
from snuba import settings
7+
from snuba.clickhouse import DATETIME_FORMAT as CLICKHOUSE_DATETIME_FORMAT
8+
from snuba.datasets.entities.entity_key import EntityKey
9+
from snuba.datasets.entities.factory import get_entity
10+
from snuba.datasets.schemas.tables import WritableTableSchema
11+
from snuba.processor import ReplacementType
12+
from snuba.replacers.errors_replacer import MergeReplacement, ReplacementContext
13+
from snuba.replacers.replacer_processor import (
14+
ReplacementMessage,
15+
ReplacementMessageMetadata,
16+
ReplacerState,
17+
)
18+
from snuba.utils.schemas import FlattenedColumn
19+
20+
21+
def test_merge_replacement_with_first_seen() -> None:
22+
now_dt = datetime.now()
23+
first_seen_dt = now_dt - timedelta(days=1)
24+
data: EndMergeMessageBody = {
25+
"transaction_id": "",
26+
"project_id": 100,
27+
"previous_group_ids": [2, 3],
28+
"new_group_id": 1,
29+
"new_group_first_seen": first_seen_dt.strftime(
30+
settings.PAYLOAD_DATETIME_FORMAT
31+
),
32+
"datetime": now_dt.strftime(settings.PAYLOAD_DATETIME_FORMAT),
33+
}
34+
35+
message = ReplacementMessage(
36+
action_type=ReplacementType.END_MERGE,
37+
data=data,
38+
metadata=mock.Mock(ReplacementMessageMetadata),
39+
)
40+
41+
columns = [
42+
FlattenedColumn(None, c.name, c.type)
43+
for c in (
44+
get_entity(EntityKey.SEARCH_ISSUES)
45+
.get_all_storages()[0]
46+
.get_schema()
47+
.get_columns()
48+
).columns
49+
if c.name in {"organization_id", "group_id", "event_id", "group_first_seen"}
50+
]
51+
52+
context = ReplacementContext(
53+
all_columns=columns,
54+
required_columns=[],
55+
state_name=ReplacerState.ERRORS,
56+
tag_column_map={},
57+
promoted_tags={},
58+
schema=mock.Mock(WritableTableSchema),
59+
)
60+
61+
mr = MergeReplacement.parse_message(message, context)
62+
assert mr is not None
63+
expected = f"""INSERT INTO table (organization_id, group_id, event_id, group_first_seen)
64+
SELECT organization_id, 1, event_id, CAST('{first_seen_dt.strftime(CLICKHOUSE_DATETIME_FORMAT)}' AS DateTime)
65+
FROM table FINAL
66+
PREWHERE group_id IN (2, 3)
67+
WHERE project_id = 100
68+
AND received <= CAST('{now_dt.strftime(CLICKHOUSE_DATETIME_FORMAT)}' AS DateTime)
69+
AND NOT deleted"""
70+
actual = mr.get_insert_query("table")
71+
assert actual is not None
72+
assert actual.strip() == expected
73+
74+
75+
def test_merge_replacement_without_first_seen() -> None:
76+
now_dt = datetime.now()
77+
data: EndMergeMessageBody = {
78+
"transaction_id": "",
79+
"project_id": 100,
80+
"previous_group_ids": [2, 3],
81+
"new_group_id": 1,
82+
"datetime": now_dt.strftime(settings.PAYLOAD_DATETIME_FORMAT),
83+
}
84+
85+
message = ReplacementMessage(
86+
action_type=ReplacementType.END_MERGE,
87+
data=data,
88+
metadata=mock.Mock(ReplacementMessageMetadata),
89+
)
90+
91+
columns = [
92+
FlattenedColumn(None, c.name, c.type)
93+
for c in (
94+
get_entity(EntityKey.SEARCH_ISSUES)
95+
.get_all_storages()[0]
96+
.get_schema()
97+
.get_columns()
98+
).columns
99+
if c.name in {"organization_id", "group_id", "event_id", "group_first_seen"}
100+
]
101+
102+
context = ReplacementContext(
103+
all_columns=columns,
104+
required_columns=[],
105+
state_name=ReplacerState.ERRORS,
106+
tag_column_map={},
107+
promoted_tags={},
108+
schema=mock.Mock(WritableTableSchema),
109+
)
110+
111+
mr = MergeReplacement.parse_message(message, context)
112+
assert mr is not None
113+
expected = f"""INSERT INTO table (organization_id, group_id, event_id, group_first_seen)
114+
SELECT organization_id, 1, event_id, group_first_seen
115+
FROM table FINAL
116+
PREWHERE group_id IN (2, 3)
117+
WHERE project_id = 100
118+
AND received <= CAST('{now_dt.strftime(CLICKHOUSE_DATETIME_FORMAT)}' AS DateTime)
119+
AND NOT deleted"""
120+
actual = mr.get_insert_query("table")
121+
assert actual is not None
122+
assert actual.strip() == expected

0 commit comments

Comments
 (0)