Skip to content
134 changes: 133 additions & 1 deletion src/sentry/issues/escalating/escalating.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from __future__ import annotations

import logging
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, TypedDict

from django.db.models.signals import post_save
Expand All @@ -33,17 +34,24 @@
from sentry.models.group import Group, GroupStatus
from sentry.models.grouphistory import GroupHistoryStatus, record_group_history
from sentry.models.groupinbox import GroupInboxReason, InboxReasonDetails, add_group_to_inbox
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.search.eap.occurrences.common_queries import count_occurrences
from sentry.search.eap.occurrences.rollout_utils import EAPOccurrencesComparator
from sentry.search.eap.types import SearchResolverConfig
from sentry.search.events.types import SnubaParams
from sentry.services.eventstore.models import GroupEvent
from sentry.signals import issue_escalating
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.occurrences_rpc import Occurrences
from sentry.snuba.referrer import Referrer
from sentry.types.activity import ActivityType
from sentry.types.group import GroupSubStatus
from sentry.utils.cache import cache
from sentry.utils.snuba import raw_snql_query

logger = logging.getLogger(__name__)

__all__ = ["query_groups_past_counts", "parse_groups_past_counts"]

# The amount of data needed to generate a group forecast
Expand Down Expand Up @@ -78,6 +86,29 @@ def query_groups_past_counts(groups: Iterable[Group]) -> list[GroupsCountRespons
than 7 days old) will skew the optimization since we may only get one page and less elements than the max
ELEMENTS_PER_SNUBA_PAGE.
"""
groups_list = list(groups)
if not groups_list:
return []

snuba_results = _query_groups_past_counts_snuba(groups_list)
results = snuba_results

if EAPOccurrencesComparator.should_check_experiment(
"issues.escalating.query_groups_past_counts"
):
eap_results = _query_groups_past_counts_eap(groups_list)
results = EAPOccurrencesComparator.check_and_choose(
snuba_results,
eap_results,
"issues.escalating.query_groups_past_counts",
is_experimental_data_a_null_result=len(eap_results) == 0,
)
Comment on lines +100 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to include a reasonable match comparator or something along those lines. I think that we're likely to have small differences between the two systems just due to the fact that we don't ingest data at the same rate. It'd be good to have some kind of similarity % to compare even better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't include a reasonable match comparator in this case (since it's a little more involved), but we have that on some of the other read paths we've migrated so far. Right now we're only issuing reads in S4S (where we've been writing to EAP for almost 3 months at this point, so we should be almost at the point where the two data stores are in parity for groups in retention).

Re increased latency by adding more queries: yep this is on my todo list. We've only migrated a handful of read paths over to double reads so far, but as we do more we'll just generally be adding latency across the app which is not ideal. I'm thinking that the best solution here is to switch to a sampling approach (where we double read only a portion of the time) rather than double reading on every query, so that we'll still get representative metrics for comparison but reduce the latency impact.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the sampling approach sounds decent. It's also not too hard to delegate to threads if you want to go down that path

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of data accuracy - historically it'll probably match up, but recently ingested data can be out of sync since there's no guarantee that writes to EAP are synced up with writes to snuba. It'll be a small discrepancy, but might be worth taking into account if you're seeing some queries not match up. Something you can handle later on, anyway


return results


def _query_groups_past_counts_snuba(groups: Sequence[Group]) -> list[GroupsCountResponse]:
"""Snuba implementation: Query hourly event counts for groups."""
all_results: list[GroupsCountResponse] = []
if not groups:
return all_results
Expand All @@ -99,6 +130,107 @@ def query_groups_past_counts(groups: Iterable[Group]) -> list[GroupsCountRespons
return all_results


def _query_groups_past_counts_eap(groups: Sequence[Group]) -> list[GroupsCountResponse]:
"""EAP implementation: Query hourly event counts for groups."""
all_results: list[GroupsCountResponse] = []
if not groups:
return all_results

error_groups: list[Group] = []
other_groups: list[Group] = []
for g in groups:
if g.issue_category == GroupCategory.ERROR:
error_groups.append(g)
elif g.issue_type.should_detect_escalation():
other_groups.append(g)

all_results += _query_groups_eap_by_org(error_groups)
all_results += _query_groups_eap_by_org(other_groups)
Comment on lines +147 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q here: why not make a single call with error_groups + other_groups?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only to keep parity with the Snuba implementation, since that query orders the results such that error groups come first and then issue platform groups. Of course we don't have the same distinction in EAP, so we could definitely combine the queries, but then there's no guarantee that we get results in the same order as Snuba and therefore have exact_match on the comparator be informative.

Maybe we could keep this double query for the rollout to make sure we maintain parity and I can make a ticket to eventually condense this into one query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that sounds good! Definitely not a blocker.


return all_results


def _query_groups_eap_by_org(groups: Sequence[Group]) -> list[GroupsCountResponse]:
"""Query EAP for groups, processing by organization."""
all_results: list[GroupsCountResponse] = []
if not groups:
return all_results

start_date, end_date = _start_and_end_dates()

groups_by_org = _extract_organization_and_project_and_group_ids(groups)

for organization_id in sorted(groups_by_org.keys()):
group_ids_by_project = groups_by_org[organization_id]

try:
organization = Organization.objects.get(id=organization_id)
except Organization.DoesNotExist:
continue

project_ids = list(group_ids_by_project.keys())
all_group_ids = [gid for gids in group_ids_by_project.values() for gid in gids]
projects = list(Project.objects.filter(id__in=project_ids))

if not projects or not all_group_ids:
continue

# Build query string for group_id filter
if len(all_group_ids) == 1:
query_string = f"group_id:{all_group_ids[0]}"
else:
group_id_filter = " OR ".join([f"group_id:{gid}" for gid in all_group_ids])
query_string = f"({group_id_filter})"

snuba_params = SnubaParams(
start=start_date,
end=end_date,
organization=organization,
projects=projects,
granularity_secs=HOUR, # 1 hour buckets
)

try:
timeseries_results = Occurrences.run_grouped_timeseries_query(
params=snuba_params,
query_string=query_string,
y_axes=["count()"],
groupby=["project_id", "group_id"],
referrer=Referrer.ESCALATING_GROUPS.value,
config=SearchResolverConfig(),
)

# Transform to expected GroupsCountResponse format
for row in timeseries_results:
count = row.get("count()", 0)
if count > 0:
bucket_dt = datetime.fromtimestamp(row["time"], tz=timezone.utc)
all_results.append(
{
"project_id": int(row["project_id"]),
"group_id": int(row["group_id"]),
"hourBucket": bucket_dt.isoformat(),
"count()": int(count),
}
)

except Exception:
logger.exception(
"Fetching groups past counts from EAP failed",
extra={
"organization_id": organization_id,
"project_ids": project_ids,
"group_ids": all_group_ids,
},
)
continue

# Sort to match Snuba's orderby: (project_id, group_id, hourBucket)
all_results.sort(key=lambda x: (x["project_id"], x["group_id"], x["hourBucket"]))

return all_results


def _process_groups(
groups: Sequence[Group],
start_date: datetime,
Expand Down
173 changes: 173 additions & 0 deletions tests/sentry/issues/escalating/test_escalating.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from sentry.issues.escalating.escalating import (
GroupsCountResponse,
_query_groups_past_counts_eap,
_start_and_end_dates,
get_group_hourly_count_eap,
get_group_hourly_count_snuba,
Expand Down Expand Up @@ -449,3 +450,175 @@ def test_uses_eap_count_as_source_of_truth(
assert result == (False, None)
mock_snuba.assert_called_once_with(group)
mock_eap.assert_called_once_with(group)


class TestQueryGroupsPastCountsEAP(TestCase):
def test_empty_groups_returns_empty_list(self) -> None:
result = query_groups_past_counts([])
assert result == []

@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
def test_returns_empty_list_on_exception(self, mock_timeseries: mock.MagicMock) -> None:
group = self.create_group()
mock_timeseries.side_effect = Exception("RPC failed")

result = _query_groups_past_counts_eap([group])
assert result == []

@patch("sentry.issues.escalating.escalating._query_groups_past_counts_eap")
@patch("sentry.issues.escalating.escalating._query_groups_past_counts_snuba")
def test_uses_snuba_result_as_source_of_truth(
self, mock_snuba: mock.MagicMock, mock_eap: mock.MagicMock
) -> None:
group = self.create_group()
mock_snuba.return_value = [
{
"project_id": 1,
"group_id": group.id,
"hourBucket": "2025-01-05 10:00:00",
"count()": 5,
}
]
mock_eap.return_value = [
{
"project_id": 2,
"group_id": group.id,
"hourBucket": "2025-01-05 11:00:00",
"count()": 10,
}
]

with self.options({EAPOccurrencesComparator._should_eval_option_name(): True}):
result = query_groups_past_counts([group])

assert result == mock_snuba.return_value
mock_snuba.assert_called_once()
mock_eap.assert_called_once()

@patch("sentry.issues.escalating.escalating._query_groups_past_counts_eap")
@patch("sentry.issues.escalating.escalating._query_groups_past_counts_snuba")
def test_uses_eap_result_as_source_of_truth(
self, mock_snuba: mock.MagicMock, mock_eap: mock.MagicMock
) -> None:
group = self.create_group()
mock_snuba.return_value = [
{
"project_id": 1,
"group_id": group.id,
"hourBucket": "2025-01-05 10:00:00",
"count()": 5,
}
]
mock_eap.return_value = [
{
"project_id": 2,
"group_id": group.id,
"hourBucket": "2025-01-05 11:00:00",
"count()": 10,
}
]

with self.options(
{
EAPOccurrencesComparator._should_eval_option_name(): True,
EAPOccurrencesComparator._callsite_allowlist_option_name(): [
"issues.escalating.query_groups_past_counts"
],
}
):
result = query_groups_past_counts([group])

assert result == mock_eap.return_value
mock_snuba.assert_called_once()
mock_eap.assert_called_once()

@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
def test_eap_impl_transforms_results_correctly(self, mock_timeseries: mock.MagicMock) -> None:
group = self.create_group()

# Mock the timeseries response format
mock_timeseries.return_value = [
{
"project_id": group.project_id,
"group_id": group.id,
"time": 1736074800,
"count()": 5.0,
},
{
"project_id": group.project_id,
"group_id": group.id,
"time": 1736078400,
"count()": 3.0,
},
]

result = _query_groups_past_counts_eap([group])

mock_timeseries.assert_called_once()
assert len(result) == 2
# Check first result
assert result[0]["project_id"] == group.project_id
assert result[0]["group_id"] == group.id
assert result[0]["count()"] == 5
assert "hourBucket" in result[0]
# Check second result
assert result[1]["count()"] == 3

@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
def test_eap_impl_filters_zero_counts(self, mock_timeseries: mock.MagicMock) -> None:
group = self.create_group()

mock_timeseries.return_value = [
{
"project_id": group.project_id,
"group_id": group.id,
"time": 1736074800,
"count()": 5.0,
},
{
"project_id": group.project_id,
"group_id": group.id,
"time": 1736078400,
"count()": 0.0,
},
{
"project_id": group.project_id,
"group_id": group.id,
"time": 1736082000,
"count()": 3.0,
},
]

result = _query_groups_past_counts_eap([group])

# Zero count row should be filtered out
assert len(result) == 2
assert all(r["count()"] > 0 for r in result)

@patch("sentry.snuba.occurrences_rpc.Occurrences.run_grouped_timeseries_query")
def test_eap_impl_handles_multiple_groups(self, mock_timeseries: mock.MagicMock) -> None:
group1 = self.create_group()
group2 = self.create_group()

mock_timeseries.return_value = [
{
"project_id": group1.project_id,
"group_id": group1.id,
"time": 1736074800,
"count()": 5.0,
},
{
"project_id": group2.project_id,
"group_id": group2.id,
"time": 1736074800,
"count()": 10.0,
},
]

result = _query_groups_past_counts_eap([group1, group2])

assert len(result) == 2
# Results should be sorted by project_id, group_id, hourBucket
group_ids_in_result = [r["group_id"] for r in result]
assert group1.id in group_ids_in_result
assert group2.id in group_ids_in_result
Loading