Skip to content

Commit 087ef7f

Browse files
authored
feat(upsampling) - Error Upsampling on Metric Alerts under ACI (#97734)
- Converted count() aggregation to upsampled_count() for Entity Subscriptions on Events dataset for matching projects - Prevented "upsampled_count()" aggregate from being used directly on Detector creation
1 parent a4e82cf commit 087ef7f

File tree

4 files changed

+79
-1
lines changed

4 files changed

+79
-1
lines changed

src/sentry/snuba/entity_subscription.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from snuba_sdk import Column, Condition, Entity, Join, Op, Request
1212

1313
from sentry import features
14+
from sentry.api.helpers.error_upsampling import are_any_projects_error_upsampled
1415
from sentry.constants import CRASH_RATE_ALERT_AGGREGATE_ALIAS
1516
from sentry.exceptions import InvalidQuerySubscription, UnsupportedQuerySubscription
1617
from sentry.models.environment import Environment
@@ -206,10 +207,20 @@ def build_query_builder(
206207
query_builder_cls = ErrorsQueryBuilder
207208
parser_config_overrides.update(PARSER_CONFIG_OVERRIDES)
208209

210+
# Conditionally upsample error counts for allowlisted projects
211+
selected_aggregate = self.aggregate
212+
if (
213+
self.dataset == Dataset.Events
214+
and isinstance(selected_aggregate, str)
215+
and selected_aggregate.strip().lower() == "count()"
216+
and are_any_projects_error_upsampled(project_ids)
217+
):
218+
selected_aggregate = "upsampled_count()"
219+
209220
return query_builder_cls(
210221
dataset=Dataset(self.dataset.value),
211222
query=query,
212-
selected_columns=[self.aggregate],
223+
selected_columns=[selected_aggregate],
213224
params=params,
214225
offset=None,
215226
limit=None,

src/sentry/snuba/snuba_query_validator.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ def __init__(self, *args, timeWindowSeconds=False, **kwargs):
108108
# TODO: only accept time_window in seconds once AlertRuleSerializer is removed
109109
self.time_window_seconds = timeWindowSeconds
110110

111+
def validate_aggregate(self, aggregate: str) -> str:
112+
"""
113+
Reject upsampled_count() as user input. This function is reserved for internal use
114+
and will be applied automatically when appropriate. Users should specify count().
115+
"""
116+
if aggregate == "upsampled_count()":
117+
raise serializers.ValidationError(
118+
"upsampled_count() is not allowed as user input. Use count() instead - "
119+
"it will be automatically converted to upsampled_count() when appropriate."
120+
)
121+
return aggregate
122+
111123
def validate_query_type(self, value: int) -> SnubaQuery.Type:
112124
try:
113125
return SnubaQuery.Type(value)

tests/sentry/snuba/test_entity_subscriptions.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,49 @@ def test_get_entity_subscription_for_events_dataset_with_join(self) -> None:
391391
Condition(Column("project_id", entity=g_entity), Op.IN, [self.project.id]),
392392
]
393393

394+
def test_events_subscription_count_upsampling_toggle(self) -> None:
395+
project = self.create_project(organization=self.organization)
396+
sub = get_entity_subscription(
397+
query_type=SnubaQuery.Type.ERROR,
398+
dataset=Dataset.Events,
399+
aggregate="count()",
400+
time_window=60,
401+
)
402+
403+
# Not allowlisted → expect plain count and no sample_weight
404+
with self.options({"issues.client_error_sampling.project_allowlist": []}):
405+
qb = sub.build_query_builder(query="", project_ids=[project.id], environment=None)
406+
req = qb.get_snql_query()
407+
assert len(req.query.select) == 1
408+
func = req.query.select[0]
409+
assert getattr(func, "alias", None) == "count"
410+
411+
# Allowlisted → expect full upsampled function structure
412+
with self.options({"issues.client_error_sampling.project_allowlist": [project.id]}):
413+
qb = sub.build_query_builder(query="", project_ids=[project.id], environment=None)
414+
req = qb.get_snql_query()
415+
assert len(req.query.select) == 1
416+
func = req.query.select[0]
417+
assert getattr(func, "alias", None) == "upsampled_count"
418+
# Expect: toInt64(sum(ifNull(sample_weight, 1))) structure
419+
assert isinstance(func, Function)
420+
assert func.function == "toInt64"
421+
assert len(func.parameters) == 1
422+
423+
outer_sum = func.parameters[0]
424+
assert isinstance(outer_sum, Function)
425+
assert outer_sum.function == "sum"
426+
assert len(outer_sum.parameters) == 1
427+
428+
inner_ifnull = outer_sum.parameters[0]
429+
assert isinstance(inner_ifnull, Function)
430+
assert inner_ifnull.function == "ifNull"
431+
assert len(inner_ifnull.parameters) == 2
432+
433+
weight_col = inner_ifnull.parameters[0]
434+
assert isinstance(weight_col, Column)
435+
assert weight_col.name == "sample_weight"
436+
394437
def test_get_entity_subscription_for_eap_rpc_query(self) -> None:
395438
aggregate = "count(span.duration)"
396439
query = "span.op:http.client"

tests/sentry/workflow_engine/endpoints/test_organization_detector_index.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,18 @@ def setUp(self) -> None:
652652
"workflowIds": [self.connected_workflow.id],
653653
}
654654

655+
def test_reject_upsampled_count_aggregate(self) -> None:
656+
"""Users should not be able to submit upsampled_count() directly in ACI."""
657+
data = {**self.valid_data}
658+
data["dataSource"] = {**self.valid_data["dataSource"], "aggregate": "upsampled_count()"}
659+
660+
response = self.get_error_response(
661+
self.organization.slug,
662+
**data,
663+
status_code=400,
664+
)
665+
assert "upsampled_count() is not allowed as user input" in str(response.data)
666+
655667
def test_missing_group_type(self) -> None:
656668
data = {**self.valid_data}
657669
del data["type"]

0 commit comments

Comments
 (0)