Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 174 additions & 1 deletion src/sentry/snuba/occurrences_rpc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import logging
from datetime import timedelta
from typing import Any

import sentry_sdk
from sentry_protos.snuba.v1.request_common_pb2 import PageToken

from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute
from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS
from sentry.search.eap.resolver import SearchResolver
from sentry.search.eap.sampling import events_meta_from_rpc_request_meta
from sentry.search.eap.types import AdditionalQueries, EAPResponse, SearchResolverConfig
from sentry.search.events.types import SAMPLING_MODES, SnubaParams
from sentry.search.events.types import SAMPLING_MODES, EventsMeta, SnubaParams
from sentry.snuba import rpc_dataset_common
from sentry.snuba.discover import zerofill
from sentry.utils.snuba import SnubaTSResult, process_value

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -111,3 +116,171 @@ def run_table_query_with_tags(
),
params.debug,
)

@classmethod
@sentry_sdk.trace
def run_timeseries_query(
cls,
*,
params: SnubaParams,
query_string: str,
y_axes: list[str],
referrer: str,
config: SearchResolverConfig,
sampling_mode: SAMPLING_MODES | None,
comparison_delta: timedelta | None = None,
additional_queries: AdditionalQueries | None = None,
) -> SnubaTSResult:
"""Run a simple timeseries query (no groupby)."""
cls.validate_granularity(params)
search_resolver = cls.get_resolver(params, config)
rpc_request, aggregates, groupbys = cls.get_timeseries_query(
search_resolver=search_resolver,
params=params,
query_string=query_string,
y_axes=y_axes,
groupby=[],
referrer=referrer,
sampling_mode=sampling_mode,
additional_queries=additional_queries,
)

"""Run the query"""
rpc_response = cls._run_timeseries_rpc(params.debug, rpc_request)

"""Process the results"""
result = rpc_dataset_common.ProcessedTimeseries()
final_meta: EventsMeta = events_meta_from_rpc_request_meta(rpc_response.meta)
for resolved_field in aggregates + groupbys:
final_meta["fields"][resolved_field.public_alias] = resolved_field.search_type

for timeseries in rpc_response.result_timeseries:
processed = cls.process_timeseries_list([timeseries])
if len(result.timeseries) == 0:
result = processed
else:
for attr in ["timeseries", "confidence", "sample_count", "sampling_rate"]:
for existing, new in zip(getattr(result, attr), getattr(processed, attr)):
existing.update(new)
if len(result.timeseries) == 0:
# The rpc only zerofills for us when there are results, if there aren't any we have to do it ourselves
result.timeseries = zerofill(
[],
params.start_date,
params.end_date,
params.timeseries_granularity_secs,
["time"],
)

return SnubaTSResult(
{"data": result.timeseries, "processed_timeseries": result, "meta": final_meta},
params.start,
params.end,
params.granularity_secs,
)

@classmethod
@sentry_sdk.trace
def run_grouped_timeseries_query(
cls,
*,
params: SnubaParams,
query_string: str,
y_axes: list[str],
groupby: list[str],
referrer: str,
config: SearchResolverConfig,
sampling_mode: SAMPLING_MODES | None = None,
) -> list[dict[str, Any]]:
"""
Run a timeseries query grouped by the specified columns.

Returns a flat list of dicts, each containing:
- The groupby column values (using public aliases)
- 'time': The bucket timestamp (as epoch seconds)
- The y_axes aggregate values for that bucket

This is similar to run_top_events_timeseries_query but without the
"top N" filtering - it returns all groups matching the query.

Example:
result = Occurrences.run_grouped_timeseries_query(
params=snuba_params,
query_string="group_id:123 OR group_id:456",
y_axes=["count()"],
groupby=["project_id", "group_id"],
referrer="my_referrer",
config=SearchResolverConfig(),
)
# Returns:
# [
# {"project_id": 1, "group_id": 123, "time": 1734220800, "count()": 5},
# {"project_id": 1, "group_id": 123, "time": 1734224400, "count()": 3},
# {"project_id": 1, "group_id": 456, "time": 1734220800, "count()": 10},
# ...
# ]
"""
cls.validate_granularity(params)
search_resolver = cls.get_resolver(params, config)

# Build and run the timeseries query with groupby
rpc_request, _aggregates, groupbys_resolved = cls.get_timeseries_query(
search_resolver=search_resolver,
params=params,
query_string=query_string,
y_axes=y_axes,
groupby=groupby,
referrer=referrer,
sampling_mode=sampling_mode,
)

rpc_response = cls._run_timeseries_rpc(params.debug, rpc_request)

# Build a mapping from internal names to public aliases for groupby columns
groupby_internal_to_public: dict[str, str] = {
col.internal_name: col.public_alias for col in groupbys_resolved
}

# Group timeseries by their groupby attributes, then merge aggregates
# This handles multiple y_axes correctly by merging them into the same rows
results_by_key: dict[tuple, dict[int, dict[str, Any]]] = {}

for timeseries in rpc_response.result_timeseries:
# Extract groupby values using public aliases
groupby_values: dict[str, Any] = {}
for internal_name, value in timeseries.group_by_attributes.items():
public_alias = groupby_internal_to_public.get(internal_name)
if public_alias:
groupby_values[public_alias] = process_value(value)

# Create a hashable key from groupby values
groupby_key = tuple(sorted(groupby_values.items()))

if groupby_key not in results_by_key:
results_by_key[groupby_key] = {}

# Merge each bucket's aggregate value into the result
for i, bucket in enumerate(timeseries.buckets):
time_key = bucket.seconds

if time_key not in results_by_key[groupby_key]:
results_by_key[groupby_key][time_key] = {
**groupby_values,
"time": time_key,
}

# Add/merge aggregate value
if i < len(timeseries.data_points):
data_point = timeseries.data_points[i]
results_by_key[groupby_key][time_key][timeseries.label] = process_value(
data_point.data
)
else:
results_by_key[groupby_key][time_key][timeseries.label] = 0

# Flatten the nested dict into a list
results: list[dict[str, Any]] = []
for time_dict in results_by_key.values():
results.extend(time_dict.values())

return results
105 changes: 105 additions & 0 deletions tests/sentry/snuba/test_occurrences_rpc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import datetime, timedelta, timezone

import pytest
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeAggregation,
AttributeKey,
Expand All @@ -8,10 +11,13 @@
)
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter

from sentry.exceptions import InvalidSearchQuery
from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS
from sentry.search.eap.resolver import SearchResolver
from sentry.search.eap.types import SearchResolverConfig
from sentry.search.events.types import SnubaParams
from sentry.snuba.occurrences_rpc import Occurrences
from sentry.snuba.rpc_dataset_common import RPCBase
from sentry.testutils.cases import TestCase


Expand Down Expand Up @@ -75,3 +81,102 @@ def test_count_aggregate(self) -> None:
assert virtual_context is None
assert resolved_column.public_alias == "count()"
assert resolved_column.search_type == "integer"


class OccurrencesTimeseriesTest(TestCase):
def setUp(self) -> None:
self.project = self.create_project(name="test")
self.end = datetime.now(timezone.utc)
self.start = self.end - timedelta(days=1)
self.snuba_params = SnubaParams(
start=self.start,
end=self.end,
granularity_secs=3600, # 1 hour buckets
projects=[self.project],
)
self.config = SearchResolverConfig()

def test_get_timeseries_query_without_groupby(self) -> None:
"""Test that the simple timeseries query is constructed correctly."""
resolver = Occurrences.get_resolver(self.snuba_params, self.config)

_rpc_request, aggregates, groupbys = RPCBase.get_timeseries_query(
search_resolver=resolver,
params=self.snuba_params,
query_string="",
y_axes=["count()"],
groupby=[],
referrer="test_referrer",
sampling_mode=None,
)

# Verify no groupby columns
assert len(groupbys) == 0

# Verify aggregate is resolved
assert len(aggregates) == 1
assert aggregates[0].public_alias == "count()"

def test_get_timeseries_query_with_groupby(self) -> None:
"""Test that the grouped timeseries query is constructed correctly."""
resolver = Occurrences.get_resolver(self.snuba_params, self.config)

rpc_request, aggregates, groupbys = RPCBase.get_timeseries_query(
search_resolver=resolver,
params=self.snuba_params,
query_string="group_id:123",
y_axes=["count()"],
groupby=["project_id", "group_id"],
referrer="test_referrer",
sampling_mode=None,
)

# Verify groupby columns are resolved
assert len(groupbys) == 2
assert groupbys[0].public_alias == "project_id"
assert groupbys[0].internal_name == "sentry.project_id"
assert groupbys[1].public_alias == "group_id"
assert groupbys[1].internal_name == "group_id"

# Verify aggregate is resolved
assert len(aggregates) == 1
assert aggregates[0].public_alias == "count()"

# Verify RPC request has correct granularity
assert rpc_request.granularity_secs == 3600

def test_validate_granularity_required_for_timeseries(self) -> None:
"""Test that granularity validation fails without granularity_secs."""
params_no_granularity = SnubaParams(
start=self.start,
end=self.end,
projects=[self.project],
)

with pytest.raises(InvalidSearchQuery):
Occurrences.run_timeseries_query(
params=params_no_granularity,
query_string="",
y_axes=["count()"],
referrer="test",
config=self.config,
sampling_mode=None,
)

def test_validate_granularity_required_for_grouped_timeseries(self) -> None:
"""Test that granularity validation fails without granularity_secs."""
params_no_granularity = SnubaParams(
start=self.start,
end=self.end,
projects=[self.project],
)

with pytest.raises(InvalidSearchQuery):
Occurrences.run_grouped_timeseries_query(
params=params_no_granularity,
query_string="",
y_axes=["count()"],
groupby=["project_id", "group_id"],
referrer="test",
config=self.config,
)
Loading