Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [
"sentry-arroyo>=2.35.0",
"sentry-conventions>=0.3.0",
"sentry-kafka-schemas>=2.1.2",
"sentry-protos>=0.4.8",
"sentry-protos>=0.4.9",
"sentry-redis-tools>=0.5.1",
"sentry-relay>=0.9.19",
"sentry-sdk>=2.35.0",
Expand Down
18 changes: 9 additions & 9 deletions snuba/pipeline/stages/query_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
from snuba.query.data_source.simple import Entity, Table
from snuba.query.logical import EntityQuery
from snuba.query.logical import Query as LogicalQuery
from snuba.query.query_settings import QuerySettings
from snuba.request import Request


class EntityProcessingStage(
QueryPipelineStage[Request, ClickhouseQuery | CompositeQuery[Table]]
):
class EntityProcessingStage(QueryPipelineStage[Request, ClickhouseQuery | CompositeQuery[Table]]):
def _process_data(
self, pipe_input: QueryPipelineData[Request]
) -> ClickhouseQuery | CompositeQuery[Table]:
Expand All @@ -34,9 +33,7 @@ def _process_data(
return translated_storage_query

run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings)
if isinstance(query, LogicalQuery) and isinstance(
query.get_from_clause(), Entity
):
if isinstance(query, LogicalQuery) and isinstance(query.get_from_clause(), Entity):
return run_entity_processing_executor(query, pipe_input.query_settings)
elif isinstance(query, CompositeQuery):
# if we were not able to translate the storage query earlier and we got to this point, this is
Expand All @@ -56,15 +53,18 @@ class StorageProcessingStage(
]
):
def _apply_default_subscriptable_mapping(
self, query: ClickhouseQuery | CompositeQuery[Table]
self, query: ClickhouseQuery | CompositeQuery[Table], query_settings: QuerySettings
) -> None:
query.transform_expressions(transform_subscriptables)
query.transform_expressions(
transform_subscriptables,
skip_transform_order_by=query_settings.get_skip_transform_order_by(),
)

def _process_data(
self, pipe_input: QueryPipelineData[ClickhouseQuery | CompositeQuery[Table]]
) -> ClickhouseQuery | CompositeQuery[Table]:
if pipe_input.query_settings.get_apply_default_subscriptable_mapping():
self._apply_default_subscriptable_mapping(pipe_input.data)
self._apply_default_subscriptable_mapping(pipe_input.data, pipe_input.query_settings)
if isinstance(pipe_input.data, ClickhouseQuery):
query_plan = build_best_plan(pipe_input.data, pipe_input.query_settings, [])
return apply_storage_processors(query_plan, pipe_input.query_settings)
Expand Down
62 changes: 19 additions & 43 deletions snuba/query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ def get_from_clause(self) -> DataSource:
def get_selected_columns(self) -> Sequence[SelectedExpression]:
return self.__selected_columns

def set_ast_selected_columns(
self, selected_columns: Sequence[SelectedExpression]
) -> None:
def set_ast_selected_columns(self, selected_columns: Sequence[SelectedExpression]) -> None:
self.__selected_columns = selected_columns

def get_groupby(self) -> Sequence[Expression]:
Expand All @@ -177,9 +175,7 @@ def add_condition_to_ast(self, condition: Expression) -> None:
if not self.__condition:
self.__condition = condition
else:
self.__condition = binary_condition(
BooleanFunctions.AND, condition, self.__condition
)
self.__condition = binary_condition(BooleanFunctions.AND, condition, self.__condition)

def get_arrayjoin(self) -> Optional[Sequence[Expression]]:
return self.__array_join
Expand Down Expand Up @@ -263,24 +259,18 @@ def get_all_expressions(self) -> Iterable[Expression]:
deduplicate any of the expressions found.
"""
return chain(
chain.from_iterable(
map(lambda selected: selected.expression, self.__selected_columns)
),
chain.from_iterable(map(lambda selected: selected.expression, self.__selected_columns)),
self.__array_join or [],
self.__condition or [],
chain.from_iterable(self.__groupby),
self.__having or [],
chain.from_iterable(
map(lambda orderby: orderby.expression, self.__order_by)
),
chain.from_iterable(map(lambda orderby: orderby.expression, self.__order_by)),
self.__limitby.columns if self.__limitby else [],
self._get_expressions_impl(),
)

@abstractmethod
def _transform_expressions_impl(
self, func: Callable[[Expression], Expression]
) -> None:
def _transform_expressions_impl(self, func: Callable[[Expression], Expression]) -> None:
"""
Applies the transformation function to all the nodes added to the
query by the children of this class.
Expand All @@ -295,6 +285,7 @@ def transform_expressions(
func: Callable[[Expression], Expression],
skip_transform_condition: bool = False,
skip_array_join: bool = False,
skip_transform_order_by: bool = False,
) -> None:
"""
Transforms in place the current query object by applying a transformation
Expand All @@ -315,9 +306,7 @@ def transform_expression_list(

self.__selected_columns = list(
map(
lambda selected: replace(
selected, expression=selected.expression.transform(func)
),
lambda selected: replace(selected, expression=selected.expression.transform(func)),
self.__selected_columns,
)
)
Expand All @@ -328,19 +317,16 @@ def transform_expression_list(
]

if not skip_transform_condition:
self.__condition = (
self.__condition.transform(func) if self.__condition else None
)
self.__condition = self.__condition.transform(func) if self.__condition else None
self.__groupby = transform_expression_list(self.__groupby)
self.__having = self.__having.transform(func) if self.__having else None
self.__order_by = list(
map(
lambda clause: replace(
clause, expression=clause.expression.transform(func)
),
self.__order_by,
if not skip_transform_order_by:
self.__order_by = list(
map(
lambda clause: replace(clause, expression=clause.expression.transform(func)),
self.__order_by,
)
)
)

if self.__limitby is not None:
self.__limitby = LimitBy(
Expand Down Expand Up @@ -371,26 +357,20 @@ def transform(self, visitor: ExpressionVisitor[Expression]) -> None:

self.__selected_columns = list(
map(
lambda selected: replace(
selected, expression=selected.expression.accept(visitor)
),
lambda selected: replace(selected, expression=selected.expression.accept(visitor)),
self.__selected_columns,
)
)
if self.__array_join is not None:
self.__array_join = [
join_element.accept(visitor) for join_element in self.__array_join
]
self.__array_join = [join_element.accept(visitor) for join_element in self.__array_join]
if self.__condition is not None:
self.__condition = self.__condition.accept(visitor)
self.__groupby = [e.accept(visitor) for e in (self.__groupby or [])]
if self.__having is not None:
self.__having = self.__having.accept(visitor)
self.__order_by = list(
map(
lambda clause: replace(
clause, expression=clause.expression.accept(visitor)
),
lambda clause: replace(clause, expression=clause.expression.accept(visitor)),
self.__order_by,
)
)
Expand All @@ -410,9 +390,7 @@ def __get_all_ast_referenced_expressions(
return ret

def get_all_ast_referenced_columns(self) -> Set[Column]:
return self.__get_all_ast_referenced_expressions(
self.get_all_expressions(), Column
)
return self.__get_all_ast_referenced_expressions(self.get_all_expressions(), Column)

def get_all_ast_referenced_subscripts(self) -> Set[SubscriptableReference]:
return self.__get_all_ast_referenced_expressions(
Expand Down Expand Up @@ -448,9 +426,7 @@ def validate_aliases(self) -> bool:
if e.alias:
if isinstance(e, Column):
qualified_col_name = (
e.column_name
if not e.table_name
else f"{e.table_name}.{e.column_name}"
e.column_name if not e.table_name else f"{e.table_name}.{e.column_name}"
)
referenced_symbols.add(qualified_col_name)
if e.alias != qualified_col_name:
Expand Down
6 changes: 5 additions & 1 deletion snuba/query/processors/physical/type_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ def __init__(self, columns: Set[str], optimize_ordering: bool = False):
)

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
query.transform_expressions(self._process_expressions, skip_transform_condition=True)
query.transform_expressions(
self._process_expressions,
skip_transform_condition=True,
skip_transform_order_by=query_settings.get_skip_transform_order_by(),
)

condition = query.get_condition()
if condition is not None:
Expand Down
15 changes: 15 additions & 0 deletions snuba/query/query_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def get_asynchronous(self) -> bool:
def get_apply_default_subscriptable_mapping(self) -> bool:
pass

@abstractmethod
def get_skip_transform_order_by(self) -> bool:
pass


# TODO: I don't like that there are two different classes for the same thing
# this could probably be replaces with a `source` attribute on the class
Expand All @@ -85,6 +89,7 @@ def __init__(
referrer: str = "unknown",
asynchronous: bool = False,
apply_default_subscriptable_mapping: bool = True,
skip_transform_order_by: bool = False,
) -> None:
super().__init__()
self.__tier = Tier.TIER_NO_TIER
Expand All @@ -99,6 +104,7 @@ def __init__(
self.referrer = referrer
self.__asynchronous = asynchronous
self.__apply_default_subscriptable_mapping = apply_default_subscriptable_mapping
self.__skip_transform_order_by = skip_transform_order_by

def get_turbo(self) -> bool:
return self.__turbo
Expand Down Expand Up @@ -136,6 +142,12 @@ def get_asynchronous(self) -> bool:
def get_apply_default_subscriptable_mapping(self) -> bool:
return self.__apply_default_subscriptable_mapping

def get_skip_transform_order_by(self) -> bool:
return self.__skip_transform_order_by

def set_skip_transform_order_by(self, value: bool) -> None:
self.__skip_transform_order_by = value

"""
Tiers indicate which storage tier to direct the query to. This is used by the TimeSeriesEndpoint and the TraceItemTableEndpoint
in EAP.
Expand Down Expand Up @@ -222,3 +234,6 @@ def get_asynchronous(self) -> bool:

def get_apply_default_subscriptable_mapping(self) -> bool:
return True

def get_skip_transform_order_by(self) -> bool:
return False
24 changes: 23 additions & 1 deletion snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from datetime import datetime, timedelta, timezone
from typing import Callable, TypeVar, cast
from typing import Any, Callable, TypeVar, cast

from google.protobuf.message import Message as ProtobufMessage
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta
Expand Down Expand Up @@ -49,6 +50,27 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:
Tin = TypeVar("Tin", bound=ProtobufMessage)
Tout = TypeVar("Tout", bound=ProtobufMessage)

BUCKET_COUNT = 40


def transform_array_value(value: dict[str, str]) -> Any:
for t, v in value.items():
if t == "Int":
return int(v)
if t == "Double":
return float(v)
if t in {"String", "Bool"}:
return v
raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}")


def process_arrays(raw: str) -> dict[str, list[Any]]:
parsed = json.loads(raw) or {}
arrays = {}
for key, values in parsed.items():
arrays[key] = [transform_array_value(v) for v in values]
return arrays


def _check_non_string_values_cannot_ignore_case(
comparison_filter: ComparisonFilter,
Expand Down
Loading
Loading