Skip to content

Commit 9e24416

Browse files
authored
feat(occurrences on eap): Implement double reads of bucketed group counts (#105691)
Adds EAP double-reading support for the `query_groups_past_counts` function used in escalating issue detection. Changes: - Implemented `_query_groups_past_counts_eap` that queries hourly event counts grouped by `project_id` and `group_id` - Updated `query_groups_past_counts` to use the double-read pattern with `EAPOccurrencesComparator` Ran some test queries to compare results on local devserver: ``` ============================================================ EAP Query Groups Past Counts - Manual Test ============================================================ ============================================================ Testing organization: Sentry (id=1) ============================================================ Found 19 groups to test Group IDs: [19, 18, 17, 16, 15, 14, 13, 12, 11, 10]... Error groups: 19, Other escalatable groups: 19 --- Testing Snuba implementation --- Snuba returned 4 results Sample result: {'project_id': 3, 'group_id': 8, 'hourBucket': '2026-01-05T23:00:00+00:00', 'count()': 11} --- Testing EAP implementation --- 23:53:57 [INFO] sentry.utils.snuba_rpc: Running a EndpointTimeSeries RPC query (rpc_query={'meta': {'organizationId': '1', 'referrer': 'sentry.issues.escalating', 'projectIds': ['1', '2', '3'], 'startTimestamp': '2025-12-29T23:00:00Z', 'endTimestamp': '2026-01-06T00:00:00Z', 'traceItemType': 'TRACE_ITEM_TYPE_OCCURRENCE', 'downsampledStorageConfig': {'mode': 'MODE_NORMAL'}}, 'filter': {'andFilter': {'filters': [{'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '19'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '18'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '17'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '12'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '11'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '10'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '9'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '8'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '16'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '15'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '14'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '13'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '2'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '7'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '6'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '5'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '4'}}}, {'orFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '3'}}}, {'comparisonFilter': {'key': {'type': 'TYPE_INT', 'name': 'group_id'}, 'op': 'OP_EQUALS', 'value': {'valInt': '1'}}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}]}}, {'andFilter': {'filters': [{'comparisonFilter': {'key': {'type': 'TYPE_DOUBLE', 'name': 'sentry.timestamp'}, 'op': 'OP_GREATER_THAN_OR_EQUALS', 'value': {'valInt': '1767052437'}}}, {'comparisonFilter': {'key': {'type': 'TYPE_DOUBLE', 'name': 'sentry.timestamp'}, 'op': 'OP_LESS_THAN', 'value': {'valInt': '1767657237'}}}]}}]}}, 'granularitySecs': '3600', 'groupBy': [{'type': 'TYPE_INT', 'name': 'sentry.project_id'}, {'type': 'TYPE_INT', 'name': 'group_id'}], 'expressions': [{'aggregation': {'aggregate': 'FUNCTION_COUNT', 'key': {'type': 'TYPE_INT', 'name': 'sentry.project_id'}, 'label': 'count()', 'extrapolationMode': 'EXTRAPOLATION_MODE_SAMPLE_WEIGHTED'}, 'label': 'count()'}]} referrer='sentry.issues.escalating' organization_id=1 trace_item_type=7) 23:53:57 [INFO] sentry.utils.snuba_rpc: Timeseries RPC query response (rpc_rows=169 organization_id=1 meta=request_id: "e080d91837454cce94a73d6b9934fe2b" query_info { stats { progress_bytes: 1488 } metadata { } } downsampled_storage_meta { } ) EAP returned 4 results Sample result: {'project_id': 3, 'group_id': 8, 'hourBucket': '2026-01-05T23:00:00+00:00', 'count()': 11} --- Comparison --- Total Snuba results: 4 Total EAP results: 4 Matching entries: 4 Mismatching entries: 0 Only in Snuba: 0 Only in EAP: 0 --- Ordering Check --- ✅ Results are in the same order (exact match) --- Summary --- ✅ SUCCESS: Results match perfectly (data and ordering)! ```
1 parent d4c45d6 commit 9e24416

File tree

2 files changed

+306
-1
lines changed

2 files changed

+306
-1
lines changed

src/sentry/issues/escalating/escalating.py

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
from __future__ import annotations
66

7+
import logging
78
from collections import defaultdict
89
from collections.abc import Iterable, Mapping, Sequence
9-
from datetime import datetime, timedelta
10+
from datetime import datetime, timedelta, timezone
1011
from typing import Any, TypedDict
1112

1213
from django.db.models.signals import post_save
@@ -33,17 +34,24 @@
3334
from sentry.models.group import Group, GroupStatus
3435
from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
3536
from sentry.models.groupinbox import GroupInboxReason, InboxReasonDetails, add_group_to_inbox
37+
from sentry.models.organization import Organization
38+
from sentry.models.project import Project
3639
from sentry.search.eap.occurrences.common_queries import count_occurrences
3740
from sentry.search.eap.occurrences.rollout_utils import EAPOccurrencesComparator
41+
from sentry.search.eap.types import SearchResolverConfig
42+
from sentry.search.events.types import SnubaParams
3843
from sentry.services.eventstore.models import GroupEvent
3944
from sentry.signals import issue_escalating
4045
from sentry.snuba.dataset import Dataset, EntityKey
46+
from sentry.snuba.occurrences_rpc import Occurrences
4147
from sentry.snuba.referrer import Referrer
4248
from sentry.types.activity import ActivityType
4349
from sentry.types.group import GroupSubStatus
4450
from sentry.utils.cache import cache
4551
from sentry.utils.snuba import raw_snql_query
4652

53+
logger = logging.getLogger(__name__)
54+
4755
__all__ = ["query_groups_past_counts", "parse_groups_past_counts"]
4856

4957
# The amount of data needed to generate a group forecast
@@ -78,6 +86,29 @@ def query_groups_past_counts(groups: Iterable[Group]) -> list[GroupsCountRespons
7886
than 7 days old) will skew the optimization since we may only get one page and less elements than the max
7987
ELEMENTS_PER_SNUBA_PAGE.
8088
"""
89+
groups_list = list(groups)
90+
if not groups_list:
91+
return []
92+
93+
snuba_results = _query_groups_past_counts_snuba(groups_list)
94+
results = snuba_results
95+
96+
if EAPOccurrencesComparator.should_check_experiment(
97+
"issues.escalating.query_groups_past_counts"
98+
):
99+
eap_results = _query_groups_past_counts_eap(groups_list)
100+
results = EAPOccurrencesComparator.check_and_choose(
101+
snuba_results,
102+
eap_results,
103+
"issues.escalating.query_groups_past_counts",
104+
is_experimental_data_a_null_result=len(eap_results) == 0,
105+
)
106+
107+
return results
108+
109+
110+
def _query_groups_past_counts_snuba(groups: Sequence[Group]) -> list[GroupsCountResponse]:
111+
"""Snuba implementation: Query hourly event counts for groups."""
81112
all_results: list[GroupsCountResponse] = []
82113
if not groups:
83114
return all_results
@@ -99,6 +130,107 @@ def query_groups_past_counts(groups: Iterable[Group]) -> list[GroupsCountRespons
99130
return all_results
100131

101132

133+
def _query_groups_past_counts_eap(groups: Sequence[Group]) -> list[GroupsCountResponse]:
134+
"""EAP implementation: Query hourly event counts for groups."""
135+
all_results: list[GroupsCountResponse] = []
136+
if not groups:
137+
return all_results
138+
139+
error_groups: list[Group] = []
140+
other_groups: list[Group] = []
141+
for g in groups:
142+
if g.issue_category == GroupCategory.ERROR:
143+
error_groups.append(g)
144+
elif g.issue_type.should_detect_escalation():
145+
other_groups.append(g)
146+
147+
all_results += _query_groups_eap_by_org(error_groups)
148+
all_results += _query_groups_eap_by_org(other_groups)
149+
150+
return all_results
151+
152+
153+
def _query_groups_eap_by_org(groups: Sequence[Group]) -> list[GroupsCountResponse]:
154+
"""Query EAP for groups, processing by organization."""
155+
all_results: list[GroupsCountResponse] = []
156+
if not groups:
157+
return all_results
158+
159+
start_date, end_date = _start_and_end_dates()
160+
161+
groups_by_org = _extract_organization_and_project_and_group_ids(groups)
162+
163+
for organization_id in sorted(groups_by_org.keys()):
164+
group_ids_by_project = groups_by_org[organization_id]
165+
166+
try:
167+
organization = Organization.objects.get(id=organization_id)
168+
except Organization.DoesNotExist:
169+
continue
170+
171+
project_ids = list(group_ids_by_project.keys())
172+
all_group_ids = [gid for gids in group_ids_by_project.values() for gid in gids]
173+
projects = list(Project.objects.filter(id__in=project_ids))
174+
175+
if not projects or not all_group_ids:
176+
continue
177+
178+
# Build query string for group_id filter
179+
if len(all_group_ids) == 1:
180+
query_string = f"group_id:{all_group_ids[0]}"
181+
else:
182+
group_id_filter = " OR ".join([f"group_id:{gid}" for gid in all_group_ids])
183+
query_string = f"({group_id_filter})"
184+
185+
snuba_params = SnubaParams(
186+
start=start_date,
187+
end=end_date,
188+
organization=organization,
189+
projects=projects,
190+
granularity_secs=HOUR, # 1 hour buckets
191+
)
192+
193+
try:
194+
timeseries_results = Occurrences.run_grouped_timeseries_query(
195+
params=snuba_params,
196+
query_string=query_string,
197+
y_axes=["count()"],
198+
groupby=["project_id", "group_id"],
199+
referrer=Referrer.ESCALATING_GROUPS.value,
200+
config=SearchResolverConfig(),
201+
)
202+
203+
# Transform to expected GroupsCountResponse format
204+
for row in timeseries_results:
205+
count = row.get("count()", 0)
206+
if count > 0:
207+
bucket_dt = datetime.fromtimestamp(row["time"], tz=timezone.utc)
208+
all_results.append(
209+
{
210+
"project_id": int(row["project_id"]),
211+
"group_id": int(row["group_id"]),
212+
"hourBucket": bucket_dt.isoformat(),
213+
"count()": int(count),
214+
}
215+
)
216+
217+
except Exception:
218+
logger.exception(
219+
"Fetching groups past counts from EAP failed",
220+
extra={
221+
"organization_id": organization_id,
222+
"project_ids": project_ids,
223+
"group_ids": all_group_ids,
224+
},
225+
)
226+
continue
227+
228+
# Sort to match Snuba's orderby: (project_id, group_id, hourBucket)
229+
all_results.sort(key=lambda x: (x["project_id"], x["group_id"], x["hourBucket"]))
230+
231+
return all_results
232+
233+
102234
def _process_groups(
103235
groups: Sequence[Group],
104236
start_date: datetime,

tests/sentry/issues/escalating/test_escalating.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from sentry.issues.escalating.escalating import (
1717
GroupsCountResponse,
18+
_query_groups_past_counts_eap,
1819
_start_and_end_dates,
1920
get_group_hourly_count_eap,
2021
get_group_hourly_count_snuba,
@@ -449,3 +450,175 @@ def test_uses_eap_count_as_source_of_truth(
449450
assert result == (False, None)
450451
mock_snuba.assert_called_once_with(group)
451452
mock_eap.assert_called_once_with(group)
453+
454+
455+
class TestQueryGroupsPastCountsEAP(TestCase):
456+
def test_empty_groups_returns_empty_list(self) -> None:
457+
result = query_groups_past_counts([])
458+
assert result == []
459+
460+
@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
461+
def test_returns_empty_list_on_exception(self, mock_timeseries: mock.MagicMock) -> None:
462+
group = self.create_group()
463+
mock_timeseries.side_effect = Exception("RPC failed")
464+
465+
result = _query_groups_past_counts_eap([group])
466+
assert result == []
467+
468+
@patch("sentry.issues.escalating.escalating._query_groups_past_counts_eap")
469+
@patch("sentry.issues.escalating.escalating._query_groups_past_counts_snuba")
470+
def test_uses_snuba_result_as_source_of_truth(
471+
self, mock_snuba: mock.MagicMock, mock_eap: mock.MagicMock
472+
) -> None:
473+
group = self.create_group()
474+
mock_snuba.return_value = [
475+
{
476+
"project_id": 1,
477+
"group_id": group.id,
478+
"hourBucket": "2025-01-05 10:00:00",
479+
"count()": 5,
480+
}
481+
]
482+
mock_eap.return_value = [
483+
{
484+
"project_id": 2,
485+
"group_id": group.id,
486+
"hourBucket": "2025-01-05 11:00:00",
487+
"count()": 10,
488+
}
489+
]
490+
491+
with self.options({EAPOccurrencesComparator._should_eval_option_name(): True}):
492+
result = query_groups_past_counts([group])
493+
494+
assert result == mock_snuba.return_value
495+
mock_snuba.assert_called_once()
496+
mock_eap.assert_called_once()
497+
498+
@patch("sentry.issues.escalating.escalating._query_groups_past_counts_eap")
499+
@patch("sentry.issues.escalating.escalating._query_groups_past_counts_snuba")
500+
def test_uses_eap_result_as_source_of_truth(
501+
self, mock_snuba: mock.MagicMock, mock_eap: mock.MagicMock
502+
) -> None:
503+
group = self.create_group()
504+
mock_snuba.return_value = [
505+
{
506+
"project_id": 1,
507+
"group_id": group.id,
508+
"hourBucket": "2025-01-05 10:00:00",
509+
"count()": 5,
510+
}
511+
]
512+
mock_eap.return_value = [
513+
{
514+
"project_id": 2,
515+
"group_id": group.id,
516+
"hourBucket": "2025-01-05 11:00:00",
517+
"count()": 10,
518+
}
519+
]
520+
521+
with self.options(
522+
{
523+
EAPOccurrencesComparator._should_eval_option_name(): True,
524+
EAPOccurrencesComparator._callsite_allowlist_option_name(): [
525+
"issues.escalating.query_groups_past_counts"
526+
],
527+
}
528+
):
529+
result = query_groups_past_counts([group])
530+
531+
assert result == mock_eap.return_value
532+
mock_snuba.assert_called_once()
533+
mock_eap.assert_called_once()
534+
535+
@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
536+
def test_eap_impl_transforms_results_correctly(self, mock_timeseries: mock.MagicMock) -> None:
537+
group = self.create_group()
538+
539+
# Mock the timeseries response format
540+
mock_timeseries.return_value = [
541+
{
542+
"project_id": group.project_id,
543+
"group_id": group.id,
544+
"time": 1736074800,
545+
"count()": 5.0,
546+
},
547+
{
548+
"project_id": group.project_id,
549+
"group_id": group.id,
550+
"time": 1736078400,
551+
"count()": 3.0,
552+
},
553+
]
554+
555+
result = _query_groups_past_counts_eap([group])
556+
557+
mock_timeseries.assert_called_once()
558+
assert len(result) == 2
559+
# Check first result
560+
assert result[0]["project_id"] == group.project_id
561+
assert result[0]["group_id"] == group.id
562+
assert result[0]["count()"] == 5
563+
assert "hourBucket" in result[0]
564+
# Check second result
565+
assert result[1]["count()"] == 3
566+
567+
@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
568+
def test_eap_impl_filters_zero_counts(self, mock_timeseries: mock.MagicMock) -> None:
569+
group = self.create_group()
570+
571+
mock_timeseries.return_value = [
572+
{
573+
"project_id": group.project_id,
574+
"group_id": group.id,
575+
"time": 1736074800,
576+
"count()": 5.0,
577+
},
578+
{
579+
"project_id": group.project_id,
580+
"group_id": group.id,
581+
"time": 1736078400,
582+
"count()": 0.0,
583+
},
584+
{
585+
"project_id": group.project_id,
586+
"group_id": group.id,
587+
"time": 1736082000,
588+
"count()": 3.0,
589+
},
590+
]
591+
592+
result = _query_groups_past_counts_eap([group])
593+
594+
# Zero count row should be filtered out
595+
assert len(result) == 2
596+
assert all(r["count()"] > 0 for r in result)
597+
598+
@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
599+
def test_eap_impl_handles_multiple_groups(self, mock_timeseries: mock.MagicMock) -> None:
600+
group1 = self.create_group()
601+
group2 = self.create_group()
602+
603+
mock_timeseries.return_value = [
604+
{
605+
"project_id": group1.project_id,
606+
"group_id": group1.id,
607+
"time": 1736074800,
608+
"count()": 5.0,
609+
},
610+
{
611+
"project_id": group2.project_id,
612+
"group_id": group2.id,
613+
"time": 1736074800,
614+
"count()": 10.0,
615+
},
616+
]
617+
618+
result = _query_groups_past_counts_eap([group1, group2])
619+
620+
assert len(result) == 2
621+
# Results should be sorted by project_id, group_id, hourBucket
622+
group_ids_in_result = [r["group_id"] for r in result]
623+
assert group1.id in group_ids_in_result
624+
assert group2.id in group_ids_in_result

0 commit comments

Comments
 (0)