Skip to content

Commit b3ea29d

Browse files
committed
WIP: Tagstore read from Occurrences EAP
1 parent 673d536 commit b3ea29d

File tree

1 file changed

+265
-4
lines changed

1 file changed

+265
-4
lines changed

src/sentry/tagstore/snuba/backend.py

Lines changed: 265 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,29 @@
11
from __future__ import annotations
22

33
import functools
4+
import logging
45
import os
56
import re
67
from collections import defaultdict
78
from collections.abc import Iterable, MutableMapping, Sequence
8-
from datetime import datetime, timedelta, timezone
9+
from datetime import UTC, datetime, timedelta
910
from typing import Any, Never, Protocol, TypedDict
1011

1112
import sentry_sdk
1213
from dateutil.parser import parse as parse_datetime
1314
from django.core.cache import cache
15+
from sentry_protos.snuba.v1.endpoint_get_trace_pb2 import GetTraceRequest
16+
from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import GetTracesRequest, TraceAttribute
17+
from sentry_protos.snuba.v1.endpoint_trace_item_attributes_pb2 import TraceItemAttributeNamesRequest
18+
from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType
19+
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue, IntArray
20+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter
1421
from sentry_relay.consts import SPAN_STATUS_CODE_TO_NAME
1522
from snuba_sdk import Column, Condition, Direction, Entity, Function, Op, OrderBy, Query, Request
1623

1724
from sentry import features, options
1825
from sentry.api.paginator import SequencePaginator
19-
from sentry.api.utils import default_start_end_dates
26+
from sentry.api.utils import default_start_end_dates, handle_query_errors
2027
from sentry.issues.grouptype import GroupCategory
2128
from sentry.models.group import Group
2229
from sentry.models.organization import Organization
@@ -26,6 +33,10 @@
2633
from sentry.models.releaseprojectenvironment import ReleaseProjectEnvironment
2734
from sentry.models.releases.release_project import ReleaseProject
2835
from sentry.replays.query import query_replays_dataset_tagkey_values
36+
from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute
37+
from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS
38+
from sentry.search.eap.resolver import SearchResolver
39+
from sentry.search.eap.types import SearchResolverConfig
2940
from sentry.search.events.constants import (
3041
PROJECT_ALIAS,
3142
RELEASE_ALIAS,
@@ -37,12 +48,14 @@
3748
)
3849
from sentry.search.events.fields import FIELD_ALIASES
3950
from sentry.search.events.filter import _flip_field_sort
51+
from sentry.search.events.types import SnubaParams
4052
from sentry.snuba.dataset import Dataset
53+
from sentry.snuba.occurrences_rpc import OccurrencesRPC
4154
from sentry.snuba.referrer import Referrer
4255
from sentry.tagstore.base import TOP_VALUES_DEFAULT_LIMIT, TagKeyStatus, TagStorage
4356
from sentry.tagstore.exceptions import GroupTagKeyNotFound, TagKeyNotFound
4457
from sentry.tagstore.types import GroupTagKey, GroupTagValue, TagKey, TagValue
45-
from sentry.utils import metrics, snuba
58+
from sentry.utils import metrics, snuba, snuba_rpc
4659
from sentry.utils.hashlib import md5_text
4760
from sentry.utils.snuba import (
4861
_prepare_start_end,
@@ -52,6 +65,8 @@
5265
raw_snql_query,
5366
)
5467

68+
logger = logging.getLogger("sentry.tagstore")
69+
5570
_max_unsampled_projects = 50
5671
if os.environ.get("SENTRY_SINGLE_TENANT"):
5772
# This is a patch we used to have in single-tenant, but moving here
@@ -93,7 +108,7 @@ def is_fuzzy_numeric_key(key):
93108
def fix_tag_value_data(data):
94109
for key, transformer in tag_value_data_transformers.items():
95110
if key in data:
96-
data[key] = transformer(data[key]).replace(tzinfo=timezone.utc)
111+
data[key] = transformer(data[key]).replace(tzinfo=UTC)
97112
return data
98113

99114

@@ -158,6 +173,244 @@ def _make_result[T, U](
158173
)
159174

160175

176+
def debug_log(*ss: str) -> None:
177+
logger.info("\n\n")
178+
for s in ss:
179+
logger.info(s)
180+
logger.info("\n\n")
181+
182+
183+
def eap_get_tags_names_for_group(group: Group) -> set[str]:
184+
start, end = default_start_end_dates()
185+
params = SnubaParams(
186+
start=start,
187+
end=end,
188+
projects=[group.project],
189+
organization=group.project.organization,
190+
)
191+
192+
column_definitions = OCCURRENCE_DEFINITIONS
193+
resolver = SearchResolver(
194+
params=params,
195+
config=SearchResolverConfig(auto_fields=True),
196+
definitions=column_definitions,
197+
)
198+
referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES
199+
meta = resolver.resolve_meta(referrer=referrer)
200+
meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE
201+
rpc_request = TraceItemAttributeNamesRequest(
202+
meta=meta,
203+
limit=9999,
204+
page_token=PageToken(offset=0),
205+
type=AttributeKey.Type.TYPE_STRING,
206+
value_substring_match="tags",
207+
intersecting_attributes_filter=TraceItemFilter(
208+
comparison_filter=ComparisonFilter(
209+
key=AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT),
210+
op=ComparisonFilter.OP_EQUALS,
211+
value=AttributeValue(val_int=group.id),
212+
)
213+
),
214+
)
215+
216+
with handle_query_errors():
217+
rpc_response = snuba_rpc.attribute_names_rpc(rpc_request)
218+
219+
tags = set()
220+
for attr in rpc_response.attributes:
221+
if attr.name.startswith("tags["):
222+
tags.add(attr.name[5:-1])
223+
224+
return tags
225+
226+
227+
def eap_get_tags_for_group(group: Group) -> None:
228+
names = eap_get_tags_names_for_group(group)
229+
start, end = default_start_end_dates()
230+
params = SnubaParams(
231+
start=start,
232+
end=end,
233+
projects=[group.project],
234+
organization=group.project.organization,
235+
)
236+
referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES
237+
config = SearchResolverConfig(auto_fields=True)
238+
239+
columns = OccurrencesRPC.DEFINITIONS.columns.copy()
240+
for name in names:
241+
tag_name = f"tags[{name}]"
242+
columns[tag_name] = ResolvedAttribute(
243+
public_alias=tag_name,
244+
internal_name=tag_name,
245+
search_type="string",
246+
)
247+
248+
definitions = ColumnDefinitions(
249+
aggregates=OccurrencesRPC.DEFINITIONS.aggregates,
250+
formulas=OccurrencesRPC.DEFINITIONS.formulas,
251+
columns=columns,
252+
contexts=OccurrencesRPC.DEFINITIONS.contexts,
253+
trace_item_type=OccurrencesRPC.DEFINITIONS.trace_item_type,
254+
filter_aliases=OccurrencesRPC.DEFINITIONS.filter_aliases,
255+
alias_to_column=OccurrencesRPC.DEFINITIONS.alias_to_column,
256+
column_to_alias=OccurrencesRPC.DEFINITIONS.column_to_alias,
257+
)
258+
259+
response = OccurrencesRPC.run_table_query(
260+
params=params,
261+
query_string=f"group_id:[{group.id}]", # f"group_id:[{group.id}]",
262+
selected_columns=[
263+
"sentry.timestamp",
264+
*[f"tags[{name}]" for name in names],
265+
], # TODO: Need to pass tagKey columns in here?
266+
equations=[],
267+
orderby=None,
268+
offset=0,
269+
limit=99999,
270+
referrer=referrer,
271+
config=config,
272+
sampling_mode="NORMAL",
273+
search_resolver=SearchResolver(params=params, config=config, definitions=definitions),
274+
)
275+
276+
debug_log(
277+
"HERE'S THE RAW RESPONSE FROM TABLE READ",
278+
str(response),
279+
)
280+
pass
281+
282+
283+
def attempt_to_get_tag_values(group: Group) -> None:
284+
"""
285+
Ideal output here is dict[TagName, TagValue]...
286+
... but I'll take any values to see that the query is working.
287+
"""
288+
params = SnubaParams(
289+
start=datetime.now() - timedelta(days=30),
290+
end=datetime.now() + timedelta(days=30),
291+
projects=[group.project],
292+
organization=group.project.organization,
293+
)
294+
referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES
295+
response = OccurrencesRPC.run_table_query(
296+
params=params,
297+
query_string=f"group_id:[{group.id}]", # f"group_id:[{group.id}]",
298+
selected_columns=[
299+
"sentry.timestamp",
300+
], # TODO: Need to pass tagKey columns in here?
301+
equations=[],
302+
orderby=None,
303+
offset=0,
304+
limit=99999,
305+
referrer=referrer,
306+
config=SearchResolverConfig(auto_fields=True),
307+
sampling_mode="NORMAL",
308+
)
309+
310+
debug_log(
311+
"HERE'S THE RAW RESPONSE FROM TABLE READ",
312+
str(response),
313+
)
314+
315+
# TODO delete dupes
316+
params = SnubaParams(
317+
start=datetime.now() - timedelta(days=30),
318+
end=datetime.now(),
319+
projects=[group.project],
320+
organization=group.project.organization,
321+
)
322+
323+
column_definitions = OCCURRENCE_DEFINITIONS
324+
resolver = SearchResolver(
325+
params=params,
326+
config=SearchResolverConfig(auto_fields=True),
327+
definitions=column_definitions,
328+
)
329+
query_filter, _, _ = resolver.resolve_query(
330+
None
331+
# f"group_id:{group.id}",
332+
)
333+
referrer = Referrer.TAGSTORE__GET_TAG_KEYS_AND_TOP_VALUES
334+
meta = resolver.resolve_meta(referrer=referrer)
335+
meta.trace_item_type = TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE
336+
337+
group_filter = TraceItemFilter(
338+
comparison_filter=ComparisonFilter(
339+
key=AttributeKey(name="group_id", type=AttributeKey.Type.TYPE_INT),
340+
op=ComparisonFilter.OP_IN,
341+
value=AttributeValue(val_int_array=IntArray(values=[group.id])),
342+
)
343+
)
344+
get_traces_request = GetTracesRequest(
345+
meta=meta,
346+
page_token=PageToken(offset=0),
347+
limit=9999,
348+
filters=[
349+
GetTracesRequest.TraceFilter(
350+
item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE,
351+
filter=group_filter,
352+
)
353+
],
354+
order_by=[],
355+
attributes=[
356+
TraceAttribute(key=TraceAttribute.Key.KEY_TRACE_ID),
357+
],
358+
)
359+
360+
get_traces_response = snuba_rpc.get_traces_rpc(get_traces_request)
361+
trace_ids = [t.attributes[0].value.val_str for t in list(get_traces_response.traces)]
362+
debug_log(
363+
f"FOUND GET TRACES RESPONSE with {len(trace_ids)} TRACES:",
364+
str(trace_ids),
365+
)
366+
367+
tag_to_values_to_counts: defaultdict[str, defaultdict[str, dict[str, Any]]] = defaultdict(
368+
lambda: defaultdict(
369+
int,
370+
)
371+
)
372+
373+
# THIS WORKS BUT IS NOT IDEAL
374+
for trace_id in trace_ids:
375+
get_trace_request = GetTraceRequest(
376+
meta=meta,
377+
trace_id=trace_id,
378+
# when this is None we just get the default limit
379+
limit=99999999,
380+
items=[
381+
GetTraceRequest.TraceItem(
382+
item_type=TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE,
383+
attributes=[],
384+
)
385+
],
386+
)
387+
get_trace_response = snuba_rpc.get_trace_rpc(get_trace_request)
388+
trace_item_groups = get_trace_response.item_groups
389+
assert len(trace_item_groups) == 1
390+
assert trace_item_groups[0].item_type == TraceItemType.TRACE_ITEM_TYPE_OCCURRENCE
391+
392+
trace_items = trace_item_groups[0].items
393+
for trace_item in trace_items:
394+
for attribute in trace_item.attributes:
395+
if attribute.key.name.startswith("tags["):
396+
tag_name = attribute.key.name[5:-1]
397+
assert (
398+
attribute.key.type == AttributeKey.Type.TYPE_STRING
399+
), "TAGS of weird type?"
400+
tag_value = attribute.value.val_str
401+
tag_to_values_to_counts[tag_name][tag_value] += 1
402+
403+
s = []
404+
for tag in tag_to_values_to_counts.keys():
405+
s.append(f"{tag}:")
406+
for value in tag_to_values_to_counts[tag]:
407+
s.append(f"\t{value}: {tag_to_values_to_counts[tag][value]}")
408+
409+
debug_log("All tags w/ value counts:", *s)
410+
411+
# TODO: PART 2: Get working response via table query (with attempt_to_get_tag_columns help)
412+
413+
161414
class SnubaTagStorage(TagStorage):
162415
key_column = "tags_key"
163416
value_column = "tags_value"
@@ -352,6 +605,7 @@ def __get_tag_keys_for_projects(
352605
metrics.incr("testing.tagstore.cache_tag_key.miss")
353606

354607
if result is None:
608+
debug_log("TAG KEYS", str(self.key_column), str(filters), str(aggregations))
355609
result = snuba.query(
356610
dataset=dataset,
357611
start=start,
@@ -725,6 +979,8 @@ def get_group_tag_keys_and_top_values(
725979
tenant_ids=None,
726980
**kwargs,
727981
):
982+
# This is the call.
983+
728984
# Similar to __get_tag_key_and_top_values except we get the top values
729985
# for all the keys provided. value_limit in this case means the number
730986
# of top values for each key, so the total rows returned should be
@@ -753,6 +1009,11 @@ def get_group_tag_keys_and_top_values(
7531009
["max", SEEN_COLUMN, "last_seen"],
7541010
]
7551011

1012+
# attempt_to_get_tag_values(group)
1013+
# x = eap_get_tags_names_for_group(group)
1014+
# debug_log("Tag names", str(x))
1015+
eap_get_tags_for_group(group)
1016+
7561017
values_by_key = snuba.query(
7571018
dataset=dataset,
7581019
start=kwargs.get("start"),

0 commit comments

Comments
 (0)