From 673d53609095af1c933fdec6f653aaa1d8fe2e7b Mon Sep 17 00:00:00 2001 From: Charles Paul Date: Thu, 11 Dec 2025 21:29:39 -0800 Subject: [PATCH 1/3] feat(occurrences): Add RPCBase boilerplate There's a lot of good work & validation in this framework. Let's pick it up and use it for our own ends. This PR adds an implementation of RPCBase for use with Occurrence trace items. --- .../search/eap/occurrences/definitions.py | 40 ++++++ src/sentry/snuba/occurrences_rpc.py | 114 ++++++++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 src/sentry/search/eap/occurrences/definitions.py create mode 100644 src/sentry/snuba/occurrences_rpc.py diff --git a/src/sentry/search/eap/occurrences/definitions.py b/src/sentry/search/eap/occurrences/definitions.py new file mode 100644 index 00000000000000..e18ca749d5bffc --- /dev/null +++ b/src/sentry/search/eap/occurrences/definitions.py @@ -0,0 +1,40 @@ +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey + +from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute +from sentry.search.eap.common_columns import COMMON_COLUMNS + +OCCURRENCES_ALWAYS_PRESENT_ATTRIBUTES = [ + AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT), +] + + +OCCURRENCE_COLUMNS = { + column.public_alias: column + for column in ( + COMMON_COLUMNS + + [ + ResolvedAttribute( + public_alias="id", + internal_name="sentry.item_id", + search_type="string", + ), + ResolvedAttribute( + public_alias="group_id", + internal_name="group_id", + search_type="integer", + ), + ] + ) +} + +OCCURRENCE_DEFINITIONS = ColumnDefinitions( + aggregates={}, # c.f. SPAN_AGGREGATE_DEFINITIONS when we're ready. + formulas={}, + columns=OCCURRENCE_COLUMNS, + contexts={}, + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + filter_aliases={}, + alias_to_column=None, + column_to_alias=None, +) diff --git a/src/sentry/snuba/occurrences_rpc.py b/src/sentry/snuba/occurrences_rpc.py new file mode 100644 index 00000000000000..8ff0976d3e8bdb --- /dev/null +++ b/src/sentry/snuba/occurrences_rpc.py @@ -0,0 +1,114 @@ +import logging + +import sentry_sdk +from sentry_protos.snuba.v1.request_common_pb2 import PageToken + +from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute +from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS +from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.types import AdditionalQueries, EAPResponse, SearchResolverConfig +from sentry.search.events.types import SAMPLING_MODES, SnubaParams +from sentry.snuba import rpc_dataset_common + +logger = logging.getLogger("sentry.snuba.occurrences_rpc") + + +class OccurrencesRPC(rpc_dataset_common.RPCBase): + DEFINITIONS = OCCURRENCE_DEFINITIONS + + @classmethod + @sentry_sdk.trace + def run_table_query( + cls, + *, + params: SnubaParams, + query_string: str, + selected_columns: list[str], + orderby: list[str] | None, + offset: int, + limit: int, + referrer: str, + config: SearchResolverConfig, + sampling_mode: SAMPLING_MODES | None = None, + equations: list[str] | None = None, + search_resolver: SearchResolver | None = None, + page_token: PageToken | None = None, + additional_queries: AdditionalQueries | None = None, + debug: bool = False, + ) -> EAPResponse: + return cls._run_table_query( + rpc_dataset_common.TableQuery( + query_string=query_string, + selected_columns=selected_columns, + equations=equations, + orderby=orderby, + offset=offset, + limit=limit, + referrer=referrer, + sampling_mode=sampling_mode, + resolver=search_resolver or cls.get_resolver(params, config), + page_token=page_token, + ), + debug, + ) + + @classmethod + @sentry_sdk.trace + def run_table_query_with_tags( + cls, + tag_names: set[str], + *, + params: SnubaParams, + query_string: str, + selected_columns: list[str], + orderby: list[str] | None, + offset: int, + limit: int, + referrer: str, + config: SearchResolverConfig, + sampling_mode: SAMPLING_MODES | None = None, + equations: list[str] | None = None, + page_token: PageToken | None = None, + additional_queries: AdditionalQueries | None = None, + debug: bool = False, + ) -> EAPResponse: + """ + Runs a query with additional selected_columns of all tags in tags. + tags should be formatted appropriately - e.g. {tags[foo], tags[bar]} + """ + + columns = OccurrencesRPC.DEFINITIONS.columns.copy() + for name in tag_names: + tag_name = f"tags[{name}]" + columns[tag_name] = ResolvedAttribute( + public_alias=tag_name, + internal_name=tag_name, + search_type="string", + ) + + definitions = ColumnDefinitions( + aggregates=OccurrencesRPC.DEFINITIONS.aggregates, + formulas=OccurrencesRPC.DEFINITIONS.formulas, + columns=columns, + contexts=OccurrencesRPC.DEFINITIONS.contexts, + trace_item_type=OccurrencesRPC.DEFINITIONS.trace_item_type, + filter_aliases=OccurrencesRPC.DEFINITIONS.filter_aliases, + alias_to_column=OccurrencesRPC.DEFINITIONS.alias_to_column, + column_to_alias=OccurrencesRPC.DEFINITIONS.column_to_alias, + ) + + return cls._run_table_query( + rpc_dataset_common.TableQuery( + query_string=query_string, + selected_columns=selected_columns, + equations=equations, + orderby=orderby, + offset=offset, + limit=limit, + referrer=referrer, + sampling_mode=sampling_mode, + resolver=SearchResolver(params=params, config=config, definitions=definitions), + page_token=page_token, + ), + debug, + ) From f76b3c99a13cbe75967f1a2a0bd7d685234a6d18 Mon Sep 17 00:00:00 2001 From: Charles Paul Date: Thu, 11 Dec 2025 21:13:56 -0800 Subject: [PATCH 2/3] feat(occurrences): Add rollout utils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We want to control how reads from Occurrences on EAP happen. Generally, we want it to look like: * We roll out ingestion to some % of a region * (Optional pause to wait for some data) * We roll out double-read to the region * The snuba & EAP results will start different — because EAP doesn't have full retention for the full region yet — but should track to zero differences over time. Then, for each new read we onboard: * Gate reading at all behind the standard rollout * Gate using the data behind the callsite allowlist * Set up a reasonable_match comparator * Land PR. * Observe data for callsite. * Should have 100% reasonable_match rate. * Should have exact_match rate go to 100% over time. * Once exact_match has been at 100% for some time, add callsite to allowlist. --- src/sentry/options/defaults.py | 20 ++++++++ .../search/eap/occurrences/rollout_utils.py | 49 +++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 src/sentry/search/eap/occurrences/rollout_utils.py diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 7b79d6f93418ac..ca9a6697b4af77 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3681,6 +3681,26 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +# Controls whether an org should read data both from Snuba and EAP. +# Will not use or display the EAP data to the user; rather, will just compare the +# data from each source and log whether they match. +register( + "eap.occurrences.should_double_read", + type=Bool, + default=False, + flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE, +) + +# Controls whether a callsite should use EAP data instead of Snuba data. +# Callsites should only be added after they're known to be safe. +register( + "eap.occurrences.callsites_using_eap_data_allowlist", + type=Sequence, + default=[], + flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE, +) + + # Killswich for LLM issue detection register( "issue-detection.llm-detection.enabled", diff --git a/src/sentry/search/eap/occurrences/rollout_utils.py b/src/sentry/search/eap/occurrences/rollout_utils.py new file mode 100644 index 00000000000000..8f80293c0ddc44 --- /dev/null +++ b/src/sentry/search/eap/occurrences/rollout_utils.py @@ -0,0 +1,49 @@ +from collections.abc import Callable +from typing import Any + +from sentry import options +from sentry.utils import metrics + + +def should_double_read_from_eap() -> bool: + return options.get("eap.occurrences.should_double_read") + + +def should_callsite_use_eap_data_in_read(callsite: str) -> bool: + return callsite in options.get("eap.occurrences.callsites_using_eap_data_allowlist") + + +def validate_read( + snuba_data: Any, + eap_data: Any, + callsite: str, + is_null_result: bool | None = None, + reasonable_match_comparator: Callable[[Any, Any], bool] | None = None, +) -> None: + """ + Checks whether a read from EAP Occurrences matches exactly with a read from snuba. + Inputs: + * snuba_data: Some data from Snuba (e.g. dict[str, str]) + * eap_data: Some data from EAP (of format expecting to match snuba_data) + * callsite: Where your read is taking place. + * is_null_result: Whether the result is a "null result" (e.g. empty array). This + helps us to determine whether a "match" is significant. + * reasonable_match_comparator: None, or a function taking snuba_data & eap_data and + returning True if the read is "reasonable" and False otherwise. + """ + tags = { + "callsite": callsite, + "exact_match": snuba_data == eap_data, + "source_of_truth": "eap" if should_callsite_use_eap_data_in_read(callsite) else "snuba", + } + + if is_null_result is not None: + tags["is_null_result"] = is_null_result + + if reasonable_match_comparator is not None: + tags["reasonable_match"] = reasonable_match_comparator(snuba_data, eap_data) + + metrics.incr( + "eap.occurrences.validate_reads", + tags=tags, + ) From 97a1bfb4a361cb645bb6b059333069e50e686523 Mon Sep 17 00:00:00 2001 From: Charles Paul Date: Thu, 11 Dec 2025 21:33:25 -0800 Subject: [PATCH 3/3] WIP: Tagstore read from Occurrences EAP --- src/sentry/tagstore/snuba/backend.py | 434 ++++++++++++++++++++++++++- 1 file changed, 428 insertions(+), 6 deletions(-) diff --git a/src/sentry/tagstore/snuba/backend.py b/src/sentry/tagstore/snuba/backend.py index 7a6b1c437f2223..1f9848142766b2 100644 --- a/src/sentry/tagstore/snuba/backend.py +++ b/src/sentry/tagstore/snuba/backend.py @@ -1,22 +1,34 @@ from __future__ import annotations import functools +import logging import os import re from collections import defaultdict from collections.abc import Iterable, MutableMapping, Sequence -from datetime import datetime, timedelta, timezone +from datetime import UTC, datetime, timedelta from typing import Any, Never, Protocol, TypedDict import sentry_sdk from dateutil.parser import parse as parse_datetime from django.core.cache import cache +from sentry_protos.snuba.v1.endpoint_get_trace_pb2 import GetTraceRequest +from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import GetTracesRequest, TraceAttribute +from sentry_protos.snuba.v1.endpoint_trace_item_attributes_pb2 import TraceItemAttributeNamesRequest +from sentry_protos.snuba.v1.endpoint_trace_item_stats_pb2 import ( + AttributeDistributionsRequest, + StatsType, + TraceItemStatsRequest, +) +from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue, IntArray +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter from sentry_relay.consts import SPAN_STATUS_CODE_TO_NAME from snuba_sdk import Column, Condition, Direction, Entity, Function, Op, OrderBy, Query, Request from sentry import features, options from sentry.api.paginator import SequencePaginator -from sentry.api.utils import default_start_end_dates +from sentry.api.utils import default_start_end_dates, handle_query_errors from sentry.issues.grouptype import GroupCategory from sentry.models.group import Group from sentry.models.organization import Organization @@ -26,6 +38,11 @@ from sentry.models.releaseprojectenvironment import ReleaseProjectEnvironment from sentry.models.releases.release_project import ReleaseProject from sentry.replays.query import query_replays_dataset_tagkey_values +from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute +from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS +from sentry.search.eap.occurrences.rollout_utils import should_double_read_from_eap, validate_read +from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.types import SearchResolverConfig from sentry.search.events.constants import ( PROJECT_ALIAS, RELEASE_ALIAS, @@ -37,12 +54,14 @@ ) from sentry.search.events.fields import FIELD_ALIASES from sentry.search.events.filter import _flip_field_sort +from sentry.search.events.types import SnubaParams from sentry.snuba.dataset import Dataset +from sentry.snuba.occurrences_rpc import OccurrencesRPC from sentry.snuba.referrer import Referrer from sentry.tagstore.base import TOP_VALUES_DEFAULT_LIMIT, TagKeyStatus, TagStorage from sentry.tagstore.exceptions import GroupTagKeyNotFound, TagKeyNotFound from sentry.tagstore.types import GroupTagKey, GroupTagValue, TagKey, TagValue -from sentry.utils import metrics, snuba +from sentry.utils import metrics, snuba, snuba_rpc from sentry.utils.hashlib import md5_text from sentry.utils.snuba import ( _prepare_start_end, @@ -52,6 +71,8 @@ raw_snql_query, ) +logger = logging.getLogger("sentry.tagstore") + _max_unsampled_projects = 50 if os.environ.get("SENTRY_SINGLE_TENANT"): # This is a patch we used to have in single-tenant, but moving here @@ -93,7 +114,7 @@ def is_fuzzy_numeric_key(key): def fix_tag_value_data(data): for key, transformer in tag_value_data_transformers.items(): if key in data: - data[key] = transformer(data[key]).replace(tzinfo=timezone.utc) + data[key] = transformer(data[key]).replace(tzinfo=UTC) return data @@ -158,11 +179,375 @@ def _make_result[T, U]( ) +def debug_log(*ss: str) -> None: + logger.info("\n\n") + for s in ss: + logger.info(s) + logger.info("\n\n") + + +def eap_get_tags_names_for_group(group: Group) -> set[str]: + start, end = default_start_end_dates() + params = SnubaParams( + start=start, + end=end, + projects=[group.project], + organization=group.project.organization, + ) + + column_definitions = OCCURRENCE_DEFINITIONS + resolver = SearchResolver( + params=params, + config=SearchResolverConfig(auto_fields=True), + definitions=column_definitions, + ) + referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES + meta = resolver.resolve_meta(referrer=referrer) + meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE + rpc_request = TraceItemAttributeNamesRequest( + meta=meta, + limit=9999, + page_token=PageToken(offset=0), + type=AttributeKey.Type.TYPE_STRING, + value_substring_match="tags", + intersecting_attributes_filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_int=group.id), + ) + ), + ) + + with handle_query_errors(): + rpc_response = snuba_rpc.attribute_names_rpc(rpc_request) + + tags = set() + for attr in rpc_response.attributes: + if attr.name.startswith("tags["): + tags.add(attr.name[5:-1]) + + return tags + + +def eap_get_tags_for_group(group: Group) -> None: + names = eap_get_tags_names_for_group(group) + start, end = default_start_end_dates() + params = SnubaParams( + start=start, + end=end, + projects=[group.project], + organization=group.project.organization, + ) + referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES + config = SearchResolverConfig(auto_fields=True) + + columns = OccurrencesRPC.DEFINITIONS.columns.copy() + for name in names: + tag_name = f"tags[{name}]" + columns[tag_name] = ResolvedAttribute( + public_alias=tag_name, + internal_name=tag_name, + search_type="string", + ) + + definitions = ColumnDefinitions( + aggregates=OccurrencesRPC.DEFINITIONS.aggregates, + formulas=OccurrencesRPC.DEFINITIONS.formulas, + columns=columns, + contexts=OccurrencesRPC.DEFINITIONS.contexts, + trace_item_type=OccurrencesRPC.DEFINITIONS.trace_item_type, + filter_aliases=OccurrencesRPC.DEFINITIONS.filter_aliases, + alias_to_column=OccurrencesRPC.DEFINITIONS.alias_to_column, + column_to_alias=OccurrencesRPC.DEFINITIONS.column_to_alias, + ) + + response = OccurrencesRPC.run_table_query( + params=params, + query_string=f"group_id:[{group.id}]", # f"group_id:[{group.id}]", + selected_columns=[ + "sentry.timestamp", + *[f"tags[{name}]" for name in names], + ], # TODO: Need to pass tagKey columns in here? + equations=[], + orderby=None, + offset=0, + limit=99999, + referrer=referrer, + config=config, + sampling_mode="NORMAL", + search_resolver=SearchResolver(params=params, config=config, definitions=definitions), + ) + + debug_log( + "HERE'S THE RAW RESPONSE FROM TABLE READ", + str(response), + ) + """ + start, end = default_start_end_dates() + params = SnubaParams( + start=start, + end=end, + projects=[group.project], + organization=group.project.organization, + ) + + column_definitions = OCCURRENCE_DEFINITIONS + resolver = SearchResolver( + params=params, + config=SearchResolverConfig(auto_fields=True), + definitions=column_definitions, + ) + referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES + meta = resolver.resolve_meta(referrer=referrer) + + stats_type = StatsType( + attribute_distributions=AttributeDistributionsRequest( + max_buckets=100, + max_attributes=9999999, + attributes=[ + AttributeKey(name=f"tags[{name}]", type=AttributeKey.Type.TYPE_STRING) + for name in names + ] + + [AttributeKey(name="tags[FAKEFAKEFAKE]", type=AttributeKey.Type.TYPE_STRING)], + ) + ) + + rpc_request = TraceItemStatsRequest( + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_int=group.id), + ) + ), + meta=meta, + stats_types=[stats_type], + ) + item_stats_response = snuba_rpc.trace_item_stats_rpc(rpc_request) + debug_log( + "Item Stats Response:", + str(item_stats_response), + ) + """ + + pass + + +def attempt_to_get_tag_values(group: Group) -> None: + """ + Ideal output here is dict[TagName, TagValue]... + ... but I'll take any values to see that the query is working. + """ + params = SnubaParams( + start=datetime.now() - timedelta(days=30), + end=datetime.now() + timedelta(days=30), + projects=[group.project], + organization=group.project.organization, + ) + referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES + response = OccurrencesRPC.run_table_query( + params=params, + query_string=f"group_id:[{group.id}]", # f"group_id:[{group.id}]", + selected_columns=[ + "sentry.timestamp", + ], # TODO: Need to pass tagKey columns in here? + equations=[], + orderby=None, + offset=0, + limit=99999, + referrer=referrer, + config=SearchResolverConfig(auto_fields=True), + sampling_mode="NORMAL", + ) + + debug_log( + "HERE'S THE RAW RESPONSE FROM TABLE READ", + str(response), + ) + + # TODO delete dupes + params = SnubaParams( + start=datetime.now() - timedelta(days=30), + end=datetime.now(), + projects=[group.project], + organization=group.project.organization, + ) + + column_definitions = OCCURRENCE_DEFINITIONS + resolver = SearchResolver( + params=params, + config=SearchResolverConfig(auto_fields=True), + definitions=column_definitions, + ) + query_filter, _, _ = resolver.resolve_query( + None + # f"group_id:{group.id}", + ) + referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES + meta = resolver.resolve_meta(referrer=referrer) + meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE + + group_filter = TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT), + op=ComparisonFilter.OP_IN, + value=AttributeValue(val_int_array=IntArray(values=[group.id])), + ) + ) + get_traces_request = GetTracesRequest( + meta=meta, + page_token=PageToken(offset=0), + limit=9999, + filters=[ + GetTracesRequest.TraceFilter( + item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + filter=group_filter, + ) + ], + order_by=[], + attributes=[ + TraceAttribute(key=TraceAttribute.Key.KEY_TRACE_ID), + ], + ) + + get_traces_response = snuba_rpc.get_traces_rpc(get_traces_request) + trace_ids = [t.attributes[0].value.val_str for t in list(get_traces_response.traces)] + debug_log( + f"FOUND GET TRACES RESPONSE with {len(trace_ids)} TRACES:", + str(trace_ids), + ) + + tag_to_values_to_counts: defaultdict[str, defaultdict[str, dict[str, Any]]] = defaultdict( + lambda: defaultdict( + int, + ) + ) + + # THIS WORKS BUT IS NOT IDEAL + for trace_id in trace_ids: + get_trace_request = GetTraceRequest( + meta=meta, + trace_id=trace_id, + # when this is None we just get the default limit + limit=99999999, + items=[ + GetTraceRequest.TraceItem( + item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE, + attributes=[], + ) + ], + ) + get_trace_response = snuba_rpc.get_trace_rpc(get_trace_request) + trace_item_groups = get_trace_response.item_groups + assert len(trace_item_groups) == 1 + assert trace_item_groups[0].item_type == TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE + + trace_items = trace_item_groups[0].items + for trace_item in trace_items: + for attribute in trace_item.attributes: + if attribute.key.name.startswith("tags["): + tag_name = attribute.key.name[5:-1] + assert ( + attribute.key.type == AttributeKey.Type.TYPE_STRING + ), "TAGS of weird type?" + tag_value = attribute.value.val_str + tag_to_values_to_counts[tag_name][tag_value] += 1 + + s = [] + for tag in tag_to_values_to_counts.keys(): + s.append(f"{tag}:") + for value in tag_to_values_to_counts[tag]: + s.append(f"\t{value}: {tag_to_values_to_counts[tag][value]}") + + debug_log("All tags w/ value counts:", *s) + + # TODO: PART 2: Get working response via table query (with attempt_to_get_tag_columns help) + + class SnubaTagStorage(TagStorage): key_column = "tags_key" value_column = "tags_value" format_string = "tags[{}]" + def __eap_get_tags_for_group( + self, tag_key: str, group: Group, limit: int | None, **kwargs + ) -> GroupTagKey | None: + """ + tag_key be unformatted (i.e. "foo" rather than "tags[foo]") + """ + attr_name = f"tags[{tag_key}]" + if limit is None or limit > 100: + # EAP imposes a limit of 100 buckets max + limit = 100 + + params = SnubaParams( + start=kwargs.get("start"), + end=kwargs.get("end"), + projects=[group.project], + organization=group.project.organization, + ) + + column_definitions = OCCURRENCE_DEFINITIONS + resolver = SearchResolver( + params=params, + config=SearchResolverConfig(auto_fields=True), + definitions=column_definitions, + ) + referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES + meta = resolver.resolve_meta(referrer=referrer) + + stats_type = StatsType( + attribute_distributions=AttributeDistributionsRequest( + max_buckets=limit, + max_attributes=1, + attributes=[AttributeKey(name=attr_name, type=AttributeKey.Type.TYPE_STRING)], + ) + ) + + # TODO: It should be possible for us to run this request without a group. + rpc_request = TraceItemStatsRequest( + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_int=group.id), + ) + ), + meta=meta, + stats_types=[stats_type], + ) + with handle_query_errors(): + item_stats_response = snuba_rpc.trace_item_stats_rpc(rpc_request) + + data = item_stats_response.results[0].attribute_distributions.attributes + if len(data) == 0: + return None + assert len(data) == 1 and data[0].attribute_name == attr_name + + value_buckets = data[0].buckets + + top_values = tuple( + GroupTagValue( + group_id=group.id, + key=tag_key, + value=bucket.label, + times_seen=int(bucket.value), + # TODO: Find way to fetch first/last seen. + first_seen=None, + last_seen=None, + ) + for bucket in value_buckets + ) + + return GroupTagKey( + group_id=group.id, + key=tag_key, + values_seen=len(data[0].buckets), + count=sum([int(bucket.value) for bucket in value_buckets]), + top_values=top_values, + ) + def __get_tag_key_and_top_values( self, project_id, @@ -214,8 +599,9 @@ def __get_tag_key_and_top_values( if not has_non_empty_value: raise TagKeyNotFound if group is None else GroupTagKeyNotFound + snuba_output: TagKey | GroupTagKey if group is None: - return _make_result( + snuba_output = _make_result( key=key, totals=totals, result=result, @@ -223,7 +609,7 @@ def __get_tag_key_and_top_values( value_ctor=TagValue, ) else: - return _make_result( + snuba_output = _make_result( key=key, totals=totals, result=result, @@ -231,6 +617,34 @@ def __get_tag_key_and_top_values( value_ctor=functools.partial(GroupTagValue, group_id=group.id), ) + if should_double_read_from_eap(): + + def reasonable_group_tag_key_match(snuba: GroupTagKey, eap: GroupTagKey) -> bool: + if snuba.group_id != eap.group_id or snuba.key != eap.key: + return False + + if snuba.values_seen < eap.values_seen or snuba.count < eap.count: + return False + + snuba_values = {v.value: v for v in snuba.top_values} + for eap_value in eap.top_values: + if eap_value in snuba_values: + if snuba_values[eap_value].times_seen < eap_value.times_seen: + return False + return True + + eap_output = self.__eap_get_tags_for_group(key, group, limit, **kwargs) + validate_read( + snuba_output, + eap_output, + "__get_tag_key_and_top_values", + is_null_result=eap_output.count == 0, + reasonable_match_comparator=reasonable_group_tag_key_match, + ) + # TODO: Once we have first/last seen, hook into allowlist to return EAP data + + return snuba_output + def __get_tag_keys( self, project_id: int, @@ -352,6 +766,7 @@ def __get_tag_keys_for_projects( metrics.incr("testing.tagstore.cache_tag_key.miss") if result is None: + debug_log("TAG KEYS", str(self.key_column), str(filters), str(aggregations)) result = snuba.query( dataset=dataset, start=start, @@ -725,6 +1140,8 @@ def get_group_tag_keys_and_top_values( tenant_ids=None, **kwargs, ): + # This is the call. + # Similar to __get_tag_key_and_top_values except we get the top values # for all the keys provided. value_limit in this case means the number # of top values for each key, so the total rows returned should be @@ -753,6 +1170,11 @@ def get_group_tag_keys_and_top_values( ["max", SEEN_COLUMN, "last_seen"], ] + # attempt_to_get_tag_values(group) + # x = eap_get_tags_names_for_group(group) + # debug_log("Tag names", str(x)) + eap_get_tags_for_group(group) + values_by_key = snuba.query( dataset=dataset, start=kwargs.get("start"),