diff --git a/src/sentry/snuba/occurrences_rpc.py b/src/sentry/snuba/occurrences_rpc.py index ec36a6d5754b3f..20c0a4424457b3 100644 --- a/src/sentry/snuba/occurrences_rpc.py +++ b/src/sentry/snuba/occurrences_rpc.py @@ -1,4 +1,6 @@ import logging +from datetime import timedelta +from typing import Any import sentry_sdk from sentry_protos.snuba.v1.request_common_pb2 import PageToken @@ -6,9 +8,12 @@ from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.sampling import events_meta_from_rpc_request_meta from sentry.search.eap.types import AdditionalQueries, EAPResponse, SearchResolverConfig -from sentry.search.events.types import SAMPLING_MODES, SnubaParams +from sentry.search.events.types import SAMPLING_MODES, EventsMeta, SnubaParams from sentry.snuba import rpc_dataset_common +from sentry.snuba.discover import zerofill +from sentry.utils.snuba import SnubaTSResult, process_value logger = logging.getLogger(__name__) @@ -111,3 +116,171 @@ def run_table_query_with_tags( ), params.debug, ) + + @classmethod + @sentry_sdk.trace + def run_timeseries_query( + cls, + *, + params: SnubaParams, + query_string: str, + y_axes: list[str], + referrer: str, + config: SearchResolverConfig, + sampling_mode: SAMPLING_MODES | None, + comparison_delta: timedelta | None = None, + additional_queries: AdditionalQueries | None = None, + ) -> SnubaTSResult: + """Run a simple timeseries query (no groupby).""" + cls.validate_granularity(params) + search_resolver = cls.get_resolver(params, config) + rpc_request, aggregates, groupbys = cls.get_timeseries_query( + search_resolver=search_resolver, + params=params, + query_string=query_string, + y_axes=y_axes, + groupby=[], + referrer=referrer, + sampling_mode=sampling_mode, + additional_queries=additional_queries, + ) + + """Run the query""" + rpc_response = cls._run_timeseries_rpc(params.debug, rpc_request) + + """Process the results""" + result = rpc_dataset_common.ProcessedTimeseries() + final_meta: EventsMeta = events_meta_from_rpc_request_meta(rpc_response.meta) + for resolved_field in aggregates + groupbys: + final_meta["fields"][resolved_field.public_alias] = resolved_field.search_type + + for timeseries in rpc_response.result_timeseries: + processed = cls.process_timeseries_list([timeseries]) + if len(result.timeseries) == 0: + result = processed + else: + for attr in ["timeseries", "confidence", "sample_count", "sampling_rate"]: + for existing, new in zip(getattr(result, attr), getattr(processed, attr)): + existing.update(new) + if len(result.timeseries) == 0: + # The rpc only zerofills for us when there are results, if there aren't any we have to do it ourselves + result.timeseries = zerofill( + [], + params.start_date, + params.end_date, + params.timeseries_granularity_secs, + ["time"], + ) + + return SnubaTSResult( + {"data": result.timeseries, "processed_timeseries": result, "meta": final_meta}, + params.start, + params.end, + params.granularity_secs, + ) + + @classmethod + @sentry_sdk.trace + def run_grouped_timeseries_query( + cls, + *, + params: SnubaParams, + query_string: str, + y_axes: list[str], + groupby: list[str], + referrer: str, + config: SearchResolverConfig, + sampling_mode: SAMPLING_MODES | None = None, + ) -> list[dict[str, Any]]: + """ + Run a timeseries query grouped by the specified columns. + + Returns a flat list of dicts, each containing: + - The groupby column values (using public aliases) + - 'time': The bucket timestamp (as epoch seconds) + - The y_axes aggregate values for that bucket + + This is similar to run_top_events_timeseries_query but without the + "top N" filtering - it returns all groups matching the query. + + Example: + result = Occurrences.run_grouped_timeseries_query( + params=snuba_params, + query_string="group_id:123 OR group_id:456", + y_axes=["count()"], + groupby=["project_id", "group_id"], + referrer="my_referrer", + config=SearchResolverConfig(), + ) + # Returns: + # [ + # {"project_id": 1, "group_id": 123, "time": 1734220800, "count()": 5}, + # {"project_id": 1, "group_id": 123, "time": 1734224400, "count()": 3}, + # {"project_id": 1, "group_id": 456, "time": 1734220800, "count()": 10}, + # ... + # ] + """ + cls.validate_granularity(params) + search_resolver = cls.get_resolver(params, config) + + # Build and run the timeseries query with groupby + rpc_request, _aggregates, groupbys_resolved = cls.get_timeseries_query( + search_resolver=search_resolver, + params=params, + query_string=query_string, + y_axes=y_axes, + groupby=groupby, + referrer=referrer, + sampling_mode=sampling_mode, + ) + + rpc_response = cls._run_timeseries_rpc(params.debug, rpc_request) + + # Build a mapping from internal names to public aliases for groupby columns + groupby_internal_to_public: dict[str, str] = { + col.internal_name: col.public_alias for col in groupbys_resolved + } + + # Group timeseries by their groupby attributes, then merge aggregates + # This handles multiple y_axes correctly by merging them into the same rows + results_by_key: dict[tuple, dict[int, dict[str, Any]]] = {} + + for timeseries in rpc_response.result_timeseries: + # Extract groupby values using public aliases + groupby_values: dict[str, Any] = {} + for internal_name, value in timeseries.group_by_attributes.items(): + public_alias = groupby_internal_to_public.get(internal_name) + if public_alias: + groupby_values[public_alias] = process_value(value) + + # Create a hashable key from groupby values + groupby_key = tuple(sorted(groupby_values.items())) + + if groupby_key not in results_by_key: + results_by_key[groupby_key] = {} + + # Merge each bucket's aggregate value into the result + for i, bucket in enumerate(timeseries.buckets): + time_key = bucket.seconds + + if time_key not in results_by_key[groupby_key]: + results_by_key[groupby_key][time_key] = { + **groupby_values, + "time": time_key, + } + + # Add/merge aggregate value + if i < len(timeseries.data_points): + data_point = timeseries.data_points[i] + results_by_key[groupby_key][time_key][timeseries.label] = process_value( + data_point.data + ) + else: + results_by_key[groupby_key][time_key][timeseries.label] = 0 + + # Flatten the nested dict into a list + results: list[dict[str, Any]] = [] + for time_dict in results_by_key.values(): + results.extend(time_dict.values()) + + return results diff --git a/tests/sentry/snuba/test_occurrences_rpc.py b/tests/sentry/snuba/test_occurrences_rpc.py index 5ddc5e813b9d72..155501645ab005 100644 --- a/tests/sentry/snuba/test_occurrences_rpc.py +++ b/tests/sentry/snuba/test_occurrences_rpc.py @@ -1,3 +1,6 @@ +from datetime import datetime, timedelta, timezone + +import pytest from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( AttributeAggregation, AttributeKey, @@ -8,10 +11,13 @@ ) from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter +from sentry.exceptions import InvalidSearchQuery from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS from sentry.search.eap.resolver import SearchResolver from sentry.search.eap.types import SearchResolverConfig from sentry.search.events.types import SnubaParams +from sentry.snuba.occurrences_rpc import Occurrences +from sentry.snuba.rpc_dataset_common import RPCBase from sentry.testutils.cases import TestCase @@ -75,3 +81,102 @@ def test_count_aggregate(self) -> None: assert virtual_context is None assert resolved_column.public_alias == "count()" assert resolved_column.search_type == "integer" + + +class OccurrencesTimeseriesTest(TestCase): + def setUp(self) -> None: + self.project = self.create_project(name="test") + self.end = datetime.now(timezone.utc) + self.start = self.end - timedelta(days=1) + self.snuba_params = SnubaParams( + start=self.start, + end=self.end, + granularity_secs=3600, # 1 hour buckets + projects=[self.project], + ) + self.config = SearchResolverConfig() + + def test_get_timeseries_query_without_groupby(self) -> None: + """Test that the simple timeseries query is constructed correctly.""" + resolver = Occurrences.get_resolver(self.snuba_params, self.config) + + _rpc_request, aggregates, groupbys = RPCBase.get_timeseries_query( + search_resolver=resolver, + params=self.snuba_params, + query_string="", + y_axes=["count()"], + groupby=[], + referrer="test_referrer", + sampling_mode=None, + ) + + # Verify no groupby columns + assert len(groupbys) == 0 + + # Verify aggregate is resolved + assert len(aggregates) == 1 + assert aggregates[0].public_alias == "count()" + + def test_get_timeseries_query_with_groupby(self) -> None: + """Test that the grouped timeseries query is constructed correctly.""" + resolver = Occurrences.get_resolver(self.snuba_params, self.config) + + rpc_request, aggregates, groupbys = RPCBase.get_timeseries_query( + search_resolver=resolver, + params=self.snuba_params, + query_string="group_id:123", + y_axes=["count()"], + groupby=["project_id", "group_id"], + referrer="test_referrer", + sampling_mode=None, + ) + + # Verify groupby columns are resolved + assert len(groupbys) == 2 + assert groupbys[0].public_alias == "project_id" + assert groupbys[0].internal_name == "sentry.project_id" + assert groupbys[1].public_alias == "group_id" + assert groupbys[1].internal_name == "group_id" + + # Verify aggregate is resolved + assert len(aggregates) == 1 + assert aggregates[0].public_alias == "count()" + + # Verify RPC request has correct granularity + assert rpc_request.granularity_secs == 3600 + + def test_validate_granularity_required_for_timeseries(self) -> None: + """Test that granularity validation fails without granularity_secs.""" + params_no_granularity = SnubaParams( + start=self.start, + end=self.end, + projects=[self.project], + ) + + with pytest.raises(InvalidSearchQuery): + Occurrences.run_timeseries_query( + params=params_no_granularity, + query_string="", + y_axes=["count()"], + referrer="test", + config=self.config, + sampling_mode=None, + ) + + def test_validate_granularity_required_for_grouped_timeseries(self) -> None: + """Test that granularity validation fails without granularity_secs.""" + params_no_granularity = SnubaParams( + start=self.start, + end=self.end, + projects=[self.project], + ) + + with pytest.raises(InvalidSearchQuery): + Occurrences.run_grouped_timeseries_query( + params=params_no_granularity, + query_string="", + y_axes=["count()"], + groupby=["project_id", "group_id"], + referrer="test", + config=self.config, + )