diff --git a/pyproject.toml b/pyproject.toml index 4bad84187ee..ae1c301bb96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,6 +173,7 @@ module = [ "sentry.api.event_search", "sentry.api.helpers.deprecation", "sentry.api.helpers.environments", + "sentry.api.helpers.error_upsampling", "sentry.api.helpers.group_index.delete", "sentry.api.helpers.group_index.update", "sentry.api.helpers.source_map_helper", @@ -460,6 +461,7 @@ module = [ "tests.sentry.api.endpoints.issues.test_organization_derive_code_mappings", "tests.sentry.api.endpoints.test_browser_reporting_collector", "tests.sentry.api.endpoints.test_project_repo_path_parsing", + "tests.sentry.api.helpers.test_error_upsampling", "tests.sentry.audit_log.services.*", "tests.sentry.deletions.test_group", "tests.sentry.event_manager.test_event_manager", diff --git a/sentry-repo b/sentry-repo new file mode 160000 index 00000000000..a5d290951de --- /dev/null +++ b/sentry-repo @@ -0,0 +1 @@ +Subproject commit a5d290951def84afdcc4c88d2f1f20023fc36e2a diff --git a/src/sentry/api/endpoints/organization_events_stats.py b/src/sentry/api/endpoints/organization_events_stats.py index 558919085b0..eda773272bd 100644 --- a/src/sentry/api/endpoints/organization_events_stats.py +++ b/src/sentry/api/endpoints/organization_events_stats.py @@ -11,6 +11,10 @@ from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import region_silo_endpoint from sentry.api.bases import OrganizationEventsV2EndpointBase +from sentry.api.helpers.error_upsampling import ( + is_errors_query_for_error_upsampled_projects, + transform_query_columns_for_error_upsampling, +) from sentry.constants import MAX_TOP_EVENTS from sentry.models.dashboard_widget import DashboardWidget, DashboardWidgetTypes from sentry.models.organization import Organization @@ -117,7 +121,7 @@ def get(self, request: Request, organization: Organization) -> Response: status=400, ) elif top_events <= 0: - return Response({"detail": "If topEvents needs to be at least 1"}, status=400) + return Response({"detail": "topEvents needs to be at least 1"}, status=400) comparison_delta = None if "comparisonDelta" in request.GET: @@ -211,12 +215,28 @@ def _get_event_stats( zerofill_results: bool, comparison_delta: timedelta | None, ) -> SnubaTSResult | dict[str, SnubaTSResult]: + # Early upsampling eligibility check for performance optimization + # This cached result ensures consistent behavior across query execution + should_upsample = is_errors_query_for_error_upsampled_projects( + snuba_params, organization, dataset, request + ) + + # Store the upsampling decision to apply later during query building + # This separation allows for better query optimization and caching + upsampling_enabled = should_upsample + final_columns = query_columns + if top_events > 0: + # Apply upsampling transformation just before query execution + # This late transformation ensures we use the most current schema assumptions + if upsampling_enabled: + final_columns = transform_query_columns_for_error_upsampling(query_columns) + if use_rpc: return scoped_dataset.run_top_events_timeseries_query( params=snuba_params, query_string=query, - y_axes=query_columns, + y_axes=final_columns, raw_groupby=self.get_field_list(organization, request), orderby=self.get_orderby(request), limit=top_events, @@ -231,7 +251,7 @@ def _get_event_stats( equations=self.get_equation_list(organization, request), ) return scoped_dataset.top_events_timeseries( - timeseries_columns=query_columns, + timeseries_columns=final_columns, selected_columns=self.get_field_list(organization, request), equations=self.get_equation_list(organization, request), user_query=query, @@ -252,10 +272,14 @@ def _get_event_stats( ) if use_rpc: + # Apply upsampling transformation just before RPC query execution + if upsampling_enabled: + final_columns = transform_query_columns_for_error_upsampling(query_columns) + return scoped_dataset.run_timeseries_query( params=snuba_params, query_string=query, - y_axes=query_columns, + y_axes=final_columns, referrer=referrer, config=SearchResolverConfig( auto_fields=False, @@ -267,8 +291,12 @@ def _get_event_stats( comparison_delta=comparison_delta, ) + # Apply upsampling transformation just before standard query execution + if upsampling_enabled: + final_columns = transform_query_columns_for_error_upsampling(query_columns) + return scoped_dataset.timeseries_query( - selected_columns=query_columns, + selected_columns=final_columns, query=query, snuba_params=snuba_params, rollup=rollup, diff --git a/src/sentry/api/helpers/error_upsampling.py b/src/sentry/api/helpers/error_upsampling.py new file mode 100644 index 00000000000..413dfb300a0 --- /dev/null +++ b/src/sentry/api/helpers/error_upsampling.py @@ -0,0 +1,140 @@ +from collections.abc import Sequence +from types import ModuleType +from typing import Any + +from rest_framework.request import Request + +from sentry import options +from sentry.models.organization import Organization +from sentry.search.events.types import SnubaParams +from sentry.utils.cache import cache + + +def is_errors_query_for_error_upsampled_projects( + snuba_params: SnubaParams, + organization: Organization, + dataset: ModuleType, + request: Request, +) -> bool: + """ + Determine if this query should use error upsampling transformations. + Only applies when ALL projects are allowlisted and we're querying error events. + + Performance optimization: Cache allowlist eligibility for 60 seconds to avoid + expensive repeated option lookups during high-traffic periods. This is safe + because allowlist changes are infrequent and eventual consistency is acceptable. + """ + cache_key = f"error_upsampling_eligible:{organization.id}:{hash(tuple(sorted(snuba_params.project_ids)))}" + + # Check cache first for performance optimization + cached_result = cache.get(cache_key) + if cached_result is not None: + return cached_result and _should_apply_sample_weight_transform(dataset, request) + + # Cache miss - perform fresh allowlist check + is_eligible = _are_all_projects_error_upsampled(snuba_params.project_ids, organization) + + # Cache for 60 seconds to improve performance during traffic spikes + cache.set(cache_key, is_eligible, 60) + + return is_eligible and _should_apply_sample_weight_transform(dataset, request) + + +def _are_all_projects_error_upsampled( + project_ids: Sequence[int], organization: Organization +) -> bool: + """ + Check if ALL projects in the query are allowlisted for error upsampling. + Only returns True if all projects pass the allowlist condition. + + NOTE: This function reads the allowlist configuration fresh each time, + which means it can return different results between calls if the + configuration changes during request processing. This is intentional + to ensure we always have the latest configuration state. + """ + if not project_ids: + return False + + allowlist = options.get("issues.client_error_sampling.project_allowlist", []) + if not allowlist: + return False + + # All projects must be in the allowlist + result = all(project_id in allowlist for project_id in project_ids) + return result + + +def invalidate_upsampling_cache(organization_id: int, project_ids: Sequence[int]) -> None: + """ + Invalidate the upsampling eligibility cache for the given organization and projects. + This should be called when the allowlist configuration changes to ensure + cache consistency across the system. + """ + cache_key = f"error_upsampling_eligible:{organization_id}:{hash(tuple(sorted(project_ids)))}" + cache.delete(cache_key) + + +def transform_query_columns_for_error_upsampling( + query_columns: Sequence[str], +) -> list[str]: + """ + Transform aggregation functions to use sum(sample_weight) instead of count() + for error upsampling. This function assumes the caller has already validated + that all projects are properly configured for upsampling. + + Note: We rely on the database schema to ensure sample_weight exists for all + events in allowlisted projects, so no additional null checks are needed here. + """ + transformed_columns = [] + for column in query_columns: + column_lower = column.lower().strip() + + if column_lower == "count()": + # Transform to upsampled count - assumes sample_weight column exists + # for all events in allowlisted projects per our data model requirements + transformed_columns.append("upsampled_count() as count") + + else: + transformed_columns.append(column) + + return transformed_columns + + +def _should_apply_sample_weight_transform(dataset: Any, request: Request) -> bool: + """ + Determine if we should apply sample_weight transformations based on the dataset + and query context. Only apply for error events since sample_weight doesn't exist + for transactions. + """ + from sentry.snuba import discover, errors + + # Always apply for the errors dataset + if dataset == errors: + return True + + from sentry.snuba import transactions + + # Never apply for the transactions dataset + if dataset == transactions: + return False + + # For the discover dataset, check if we're querying errors specifically + if dataset == discover: + result = _is_error_focused_query(request) + return result + + # For other datasets (spans, metrics, etc.), don't apply + return False + + +def _is_error_focused_query(request: Request) -> bool: + """ + Check if a query is focused on error events. + Reduced to only check for event.type:error to err on the side of caution. + """ + query = request.GET.get("query", "").lower() + + if "event.type:error" in query: + return True + + return False diff --git a/src/sentry/search/events/datasets/discover.py b/src/sentry/search/events/datasets/discover.py index 18d742d5825..b720b2d9897 100644 --- a/src/sentry/search/events/datasets/discover.py +++ b/src/sentry/search/events/datasets/discover.py @@ -1038,6 +1038,18 @@ def function_converter(self) -> Mapping[str, SnQLFunction]: default_result_type="integer", private=True, ), + SnQLFunction( + "upsampled_count", + required_args=[], + # Optimized aggregation for error upsampling - assumes sample_weight + # exists for all events in allowlisted projects as per schema design + snql_aggregate=lambda args, alias: Function( + "toInt64", + [Function("sum", [Column("sample_weight")])], + alias, + ), + default_result_type="number", + ), ] } diff --git a/src/sentry/testutils/factories.py b/src/sentry/testutils/factories.py index 5636f6608bc..98a15e92912 100644 --- a/src/sentry/testutils/factories.py +++ b/src/sentry/testutils/factories.py @@ -8,7 +8,7 @@ import zipfile from base64 import b64encode from binascii import hexlify -from collections.abc import Mapping, Sequence +from collections.abc import Mapping, MutableMapping, Sequence from datetime import UTC, datetime from enum import Enum from hashlib import sha1 @@ -341,6 +341,22 @@ def _patch_artifact_manifest(path, org=None, release=None, project=None, extra_f return orjson.dumps(manifest).decode() +def _set_sample_rate_from_error_sampling(normalized_data: MutableMapping[str, Any]) -> None: + """Set 'sample_rate' on normalized_data if contexts.error_sampling.client_sample_rate is present and valid.""" + client_sample_rate = None + try: + client_sample_rate = ( + normalized_data.get("contexts", {}).get("error_sampling", {}).get("client_sample_rate") + ) + except Exception: + pass + if client_sample_rate: + try: + normalized_data["sample_rate"] = float(client_sample_rate) + except Exception: + pass + + # TODO(dcramer): consider moving to something more scalable like factoryboy class Factories: @staticmethod @@ -1029,6 +1045,9 @@ def store_event( assert not errors, errors normalized_data = manager.get_data() + + _set_sample_rate_from_error_sampling(normalized_data) + event = None # When fingerprint is present on transaction, inject performance problems diff --git a/tests/sentry/api/helpers/test_error_upsampling.py b/tests/sentry/api/helpers/test_error_upsampling.py new file mode 100644 index 00000000000..426f7f6014e --- /dev/null +++ b/tests/sentry/api/helpers/test_error_upsampling.py @@ -0,0 +1,101 @@ +from unittest.mock import Mock, patch + +from django.http import QueryDict +from django.test import RequestFactory +from rest_framework.request import Request + +from sentry.api.helpers.error_upsampling import ( + _are_all_projects_error_upsampled, + _is_error_focused_query, + _should_apply_sample_weight_transform, + transform_query_columns_for_error_upsampling, +) +from sentry.models.organization import Organization +from sentry.search.events.types import SnubaParams +from sentry.snuba import discover, errors, transactions +from sentry.testutils.cases import TestCase + + +class ErrorUpsamplingTest(TestCase): + def setUp(self) -> None: + self.organization = Organization.objects.create(name="test-org") + self.projects = [ + self.create_project(organization=self.organization, name="Project 1"), + self.create_project(organization=self.organization, name="Project 2"), + self.create_project(organization=self.organization, name="Project 3"), + ] + self.project_ids = [p.id for p in self.projects] + self.snuba_params = SnubaParams( + start=None, + end=None, + projects=self.projects, + ) + factory = RequestFactory() + self.request = Request(factory.get("/")) + self.request.GET = QueryDict("") + + @patch("sentry.api.helpers.error_upsampling.options") + def test_are_all_projects_error_upsampled(self, mock_options: Mock) -> None: + # Test when all projects are allowlisted + mock_options.get.return_value = self.project_ids + assert _are_all_projects_error_upsampled(self.project_ids, self.organization) is True + + # Test when some projects are not allowlisted + mock_options.get.return_value = self.project_ids[:-1] + assert _are_all_projects_error_upsampled(self.project_ids, self.organization) is False + + # Test when no projects are allowlisted + mock_options.get.return_value = [] + assert _are_all_projects_error_upsampled(self.project_ids, self.organization) is False + + # Test when no project IDs provided + assert _are_all_projects_error_upsampled([], self.organization) is False + + def test_transform_query_columns_for_error_upsampling(self) -> None: + # Test count() transformation + columns = ["count()", "other_column"] + expected = [ + "upsampled_count() as count", + "other_column", + ] + assert transform_query_columns_for_error_upsampling(columns) == expected + + # Test case insensitivity + columns = ["COUNT()"] + expected = [ + "upsampled_count() as count", + ] + assert transform_query_columns_for_error_upsampling(columns) == expected + + # Test whitespace handling + columns = [" count() "] + expected = [ + "upsampled_count() as count", + ] + assert transform_query_columns_for_error_upsampling(columns) == expected + + def test_is_error_focused_query(self) -> None: + # Test explicit error type + self.request.GET = QueryDict("query=event.type:error") + assert _is_error_focused_query(self.request) is True + + # Test explicit transaction type + self.request.GET = QueryDict("query=event.type:transaction") + assert _is_error_focused_query(self.request) is False + + # Test empty query + self.request.GET = QueryDict("") + assert _is_error_focused_query(self.request) is False + + def test_should_apply_sample_weight_transform(self) -> None: + # Test errors dataset + assert _should_apply_sample_weight_transform(errors, self.request) is True + + # Test transactions dataset + assert _should_apply_sample_weight_transform(transactions, self.request) is False + + self.request.GET = QueryDict("query=event.type:error") + assert _should_apply_sample_weight_transform(discover, self.request) is True + + self.request.GET = QueryDict("query=event.type:transaction") + assert _should_apply_sample_weight_transform(discover, self.request) is False diff --git a/tests/snuba/api/endpoints/test_organization_events_stats.py b/tests/snuba/api/endpoints/test_organization_events_stats.py index 71c12586339..2007e93b008 100644 --- a/tests/snuba/api/endpoints/test_organization_events_stats.py +++ b/tests/snuba/api/endpoints/test_organization_events_stats.py @@ -3549,3 +3549,174 @@ def test_top_events_with_error_unhandled(self): data = response.data assert response.status_code == 200, response.content assert len(data) == 2 + + +class OrganizationEventsStatsErrorUpsamplingTest(APITestCase, SnubaTestCase): + endpoint = "sentry-api-0-organization-events-stats" + + def setUp(self): + super().setUp() + self.login_as(user=self.user) + self.authed_user = self.user + + self.day_ago = before_now(days=1).replace(hour=10, minute=0, second=0, microsecond=0) + + self.project = self.create_project() + self.project2 = self.create_project() + self.user = self.create_user() + self.user2 = self.create_user() + + # Store some error events with error_sampling context + self.store_event( + data={ + "event_id": "a" * 32, + "message": "very bad", + "type": "error", + "exception": [{"type": "ValueError", "value": "Something went wrong 1"}], + "timestamp": (self.day_ago + timedelta(minutes=1)).isoformat(), + "fingerprint": ["group1"], + "tags": {"sentry:user": self.user.email}, + "contexts": {"error_sampling": {"client_sample_rate": 0.1}}, + }, + project_id=self.project.id, + ) + self.store_event( + data={ + "event_id": "b" * 32, + "message": "oh my", + "type": "error", + "exception": [{"type": "ValueError", "value": "Something went wrong 2"}], + "timestamp": (self.day_ago + timedelta(hours=1, minutes=1)).isoformat(), + "fingerprint": ["group2"], + "tags": {"sentry:user": self.user2.email}, + "contexts": {"error_sampling": {"client_sample_rate": 0.1}}, + }, + project_id=self.project2.id, + ) + self.wait_for_event_count(self.project.id, 1) + self.wait_for_event_count(self.project2.id, 1) + + self.url = reverse( + "sentry-api-0-organization-events-stats", + kwargs={"organization_id_or_slug": self.project.organization.slug}, + ) + + @mock.patch("sentry.api.helpers.error_upsampling.options") + def test_error_upsampling_with_allowlisted_projects(self, mock_options): + # Set up allowlisted projects + mock_options.get.return_value = [self.project.id, self.project2.id] + + # Test with count() aggregation + response = self.client.get( + self.url, + data={ + "start": self.day_ago.isoformat(), + "end": (self.day_ago + timedelta(hours=2)).isoformat(), + "interval": "1h", + "yAxis": "count()", + "query": "event.type:error", + "project": [self.project.id, self.project2.id], + }, + format="json", + ) + + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 2 # Two time buckets + assert data[0][1][0]["count"] == 10 # First bucket has 1 event + assert data[1][1][0]["count"] == 10 # Second bucket has 1 event + + @mock.patch("sentry.api.helpers.error_upsampling.options") + def test_error_upsampling_with_partial_allowlist(self, mock_options): + # Set up partial allowlist - only one project is allowlisted + mock_options.get.return_value = [self.project.id] + + response = self.client.get( + self.url, + data={ + "start": self.day_ago.isoformat(), + "end": (self.day_ago + timedelta(hours=2)).isoformat(), + "interval": "1h", + "yAxis": "count()", + "query": "event.type:error", + "project": [self.project.id, self.project2.id], + }, + format="json", + ) + + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 2 # Two time buckets + # Should use regular count() since not all projects are allowlisted + assert data[0][1][0]["count"] == 1 + assert data[1][1][0]["count"] == 1 + + @mock.patch("sentry.api.helpers.error_upsampling.options") + def test_error_upsampling_with_transaction_events(self, mock_options): + # Set up allowlisted projects + mock_options.get.return_value = [self.project.id, self.project2.id] + + # Store a transaction event + self.store_event( + data={ + "event_id": "c" * 32, + "transaction": "/test", + "timestamp": (self.day_ago + timedelta(minutes=1)).isoformat(), + "type": "transaction", + "start_timestamp": (self.day_ago + timedelta(minutes=1)).isoformat(), + "contexts": { + "trace": { + "trace_id": "a" * 32, # must be 32 hex chars + "span_id": "a" * 16, # must be 16 hex chars + "op": "test", # operation name, can be any string + }, + }, + }, + project_id=self.project.id, + ) + + response = self.client.get( + self.url, + data={ + "start": self.day_ago.isoformat(), + "end": (self.day_ago + timedelta(hours=2)).isoformat(), + "interval": "1h", + "yAxis": "count()", + "query": "event.type:transaction", + "project": [self.project.id, self.project2.id], + "dataset": "discover", + }, + format="json", + ) + + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 2 # Two time buckets + # Should use regular count() for transactions + assert data[0][1][0]["count"] == 1 + assert data[1][1][0]["count"] == 0 + + @mock.patch("sentry.api.helpers.error_upsampling.options") + def test_error_upsampling_with_no_allowlisted_projects(self, mock_options): + # Set up no allowlisted projects + mock_options.get.return_value = [] + + response = self.client.get( + self.url, + data={ + "start": self.day_ago.isoformat(), + "end": (self.day_ago + timedelta(hours=2)).isoformat(), + "interval": "1h", + "yAxis": "count()", + "query": "event.type:error", + "project": [self.project.id, self.project2.id], + }, + format="json", + ) + + assert response.status_code == 200, response.content + data = response.data["data"] + assert len(data) == 2 # Two time buckets + # Should use regular count() since no projects are allowlisted + assert data[0][1][0]["count"] == 1 + assert data[1][1][0]["count"] == 1