Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [
"sentry-arroyo>=2.35.0",
"sentry-conventions>=0.3.0",
"sentry-kafka-schemas>=2.1.2",
"sentry-protos>=0.4.8",
"sentry-protos>=0.4.9",
"sentry-redis-tools>=0.5.1",
"sentry-relay>=0.9.19",
"sentry-sdk>=2.35.0",
Expand Down
18 changes: 9 additions & 9 deletions snuba/pipeline/stages/query_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
from snuba.query.data_source.simple import Entity, Table
from snuba.query.logical import EntityQuery
from snuba.query.logical import Query as LogicalQuery
from snuba.query.query_settings import QuerySettings
from snuba.request import Request


class EntityProcessingStage(
QueryPipelineStage[Request, ClickhouseQuery | CompositeQuery[Table]]
):
class EntityProcessingStage(QueryPipelineStage[Request, ClickhouseQuery | CompositeQuery[Table]]):
def _process_data(
self, pipe_input: QueryPipelineData[Request]
) -> ClickhouseQuery | CompositeQuery[Table]:
Expand All @@ -34,9 +33,7 @@ def _process_data(
return translated_storage_query

run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings)
if isinstance(query, LogicalQuery) and isinstance(
query.get_from_clause(), Entity
):
if isinstance(query, LogicalQuery) and isinstance(query.get_from_clause(), Entity):
return run_entity_processing_executor(query, pipe_input.query_settings)
elif isinstance(query, CompositeQuery):
# if we were not able to translate the storage query earlier and we got to this point, this is
Expand All @@ -56,15 +53,18 @@ class StorageProcessingStage(
]
):
def _apply_default_subscriptable_mapping(
self, query: ClickhouseQuery | CompositeQuery[Table]
self, query: ClickhouseQuery | CompositeQuery[Table], query_settings: QuerySettings
) -> None:
query.transform_expressions(transform_subscriptables)
query.transform_expressions(
transform_subscriptables,
skip_transform_order_by=query_settings.get_skip_transform_order_by(),
)

def _process_data(
self, pipe_input: QueryPipelineData[ClickhouseQuery | CompositeQuery[Table]]
) -> ClickhouseQuery | CompositeQuery[Table]:
if pipe_input.query_settings.get_apply_default_subscriptable_mapping():
self._apply_default_subscriptable_mapping(pipe_input.data)
self._apply_default_subscriptable_mapping(pipe_input.data, pipe_input.query_settings)
if isinstance(pipe_input.data, ClickhouseQuery):
query_plan = build_best_plan(pipe_input.data, pipe_input.query_settings, [])
return apply_storage_processors(query_plan, pipe_input.query_settings)
Expand Down
62 changes: 19 additions & 43 deletions snuba/query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ def get_from_clause(self) -> DataSource:
def get_selected_columns(self) -> Sequence[SelectedExpression]:
return self.__selected_columns

def set_ast_selected_columns(
self, selected_columns: Sequence[SelectedExpression]
) -> None:
def set_ast_selected_columns(self, selected_columns: Sequence[SelectedExpression]) -> None:
self.__selected_columns = selected_columns

def get_groupby(self) -> Sequence[Expression]:
Expand All @@ -177,9 +175,7 @@ def add_condition_to_ast(self, condition: Expression) -> None:
if not self.__condition:
self.__condition = condition
else:
self.__condition = binary_condition(
BooleanFunctions.AND, condition, self.__condition
)
self.__condition = binary_condition(BooleanFunctions.AND, condition, self.__condition)

def get_arrayjoin(self) -> Optional[Sequence[Expression]]:
return self.__array_join
Expand Down Expand Up @@ -263,24 +259,18 @@ def get_all_expressions(self) -> Iterable[Expression]:
deduplicate any of the expressions found.
"""
return chain(
chain.from_iterable(
map(lambda selected: selected.expression, self.__selected_columns)
),
chain.from_iterable(map(lambda selected: selected.expression, self.__selected_columns)),
self.__array_join or [],
self.__condition or [],
chain.from_iterable(self.__groupby),
self.__having or [],
chain.from_iterable(
map(lambda orderby: orderby.expression, self.__order_by)
),
chain.from_iterable(map(lambda orderby: orderby.expression, self.__order_by)),
self.__limitby.columns if self.__limitby else [],
self._get_expressions_impl(),
)

@abstractmethod
def _transform_expressions_impl(
self, func: Callable[[Expression], Expression]
) -> None:
def _transform_expressions_impl(self, func: Callable[[Expression], Expression]) -> None:
"""
Applies the transformation function to all the nodes added to the
query by the children of this class.
Expand All @@ -295,6 +285,7 @@ def transform_expressions(
func: Callable[[Expression], Expression],
skip_transform_condition: bool = False,
skip_array_join: bool = False,
skip_transform_order_by: bool = False,
) -> None:
"""
Transforms in place the current query object by applying a transformation
Expand All @@ -315,9 +306,7 @@ def transform_expression_list(

self.__selected_columns = list(
map(
lambda selected: replace(
selected, expression=selected.expression.transform(func)
),
lambda selected: replace(selected, expression=selected.expression.transform(func)),
self.__selected_columns,
)
)
Expand All @@ -328,19 +317,16 @@ def transform_expression_list(
]

if not skip_transform_condition:
self.__condition = (
self.__condition.transform(func) if self.__condition else None
)
self.__condition = self.__condition.transform(func) if self.__condition else None
self.__groupby = transform_expression_list(self.__groupby)
self.__having = self.__having.transform(func) if self.__having else None
self.__order_by = list(
map(
lambda clause: replace(
clause, expression=clause.expression.transform(func)
),
self.__order_by,
if not skip_transform_order_by:
self.__order_by = list(
map(
lambda clause: replace(clause, expression=clause.expression.transform(func)),
self.__order_by,
)
)
)

if self.__limitby is not None:
self.__limitby = LimitBy(
Expand Down Expand Up @@ -371,26 +357,20 @@ def transform(self, visitor: ExpressionVisitor[Expression]) -> None:

self.__selected_columns = list(
map(
lambda selected: replace(
selected, expression=selected.expression.accept(visitor)
),
lambda selected: replace(selected, expression=selected.expression.accept(visitor)),
self.__selected_columns,
)
)
if self.__array_join is not None:
self.__array_join = [
join_element.accept(visitor) for join_element in self.__array_join
]
self.__array_join = [join_element.accept(visitor) for join_element in self.__array_join]
if self.__condition is not None:
self.__condition = self.__condition.accept(visitor)
self.__groupby = [e.accept(visitor) for e in (self.__groupby or [])]
if self.__having is not None:
self.__having = self.__having.accept(visitor)
self.__order_by = list(
map(
lambda clause: replace(
clause, expression=clause.expression.accept(visitor)
),
lambda clause: replace(clause, expression=clause.expression.accept(visitor)),
self.__order_by,
)
)
Expand All @@ -410,9 +390,7 @@ def __get_all_ast_referenced_expressions(
return ret

def get_all_ast_referenced_columns(self) -> Set[Column]:
return self.__get_all_ast_referenced_expressions(
self.get_all_expressions(), Column
)
return self.__get_all_ast_referenced_expressions(self.get_all_expressions(), Column)

def get_all_ast_referenced_subscripts(self) -> Set[SubscriptableReference]:
return self.__get_all_ast_referenced_expressions(
Expand Down Expand Up @@ -448,9 +426,7 @@ def validate_aliases(self) -> bool:
if e.alias:
if isinstance(e, Column):
qualified_col_name = (
e.column_name
if not e.table_name
else f"{e.table_name}.{e.column_name}"
e.column_name if not e.table_name else f"{e.table_name}.{e.column_name}"
)
referenced_symbols.add(qualified_col_name)
if e.alias != qualified_col_name:
Expand Down
6 changes: 5 additions & 1 deletion snuba/query/processors/physical/type_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ def __init__(self, columns: Set[str], optimize_ordering: bool = False):
)

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
query.transform_expressions(self._process_expressions, skip_transform_condition=True)
query.transform_expressions(
self._process_expressions,
skip_transform_condition=True,
skip_transform_order_by=query_settings.get_skip_transform_order_by(),
)

condition = query.get_condition()
if condition is not None:
Expand Down
15 changes: 15 additions & 0 deletions snuba/query/query_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def get_asynchronous(self) -> bool:
def get_apply_default_subscriptable_mapping(self) -> bool:
pass

@abstractmethod
def get_skip_transform_order_by(self) -> bool:
pass


# TODO: I don't like that there are two different classes for the same thing
# this could probably be replaces with a `source` attribute on the class
Expand All @@ -85,6 +89,7 @@ def __init__(
referrer: str = "unknown",
asynchronous: bool = False,
apply_default_subscriptable_mapping: bool = True,
skip_transform_order_by: bool = False,
) -> None:
super().__init__()
self.__tier = Tier.TIER_NO_TIER
Expand All @@ -99,6 +104,7 @@ def __init__(
self.referrer = referrer
self.__asynchronous = asynchronous
self.__apply_default_subscriptable_mapping = apply_default_subscriptable_mapping
self.__skip_transform_order_by = skip_transform_order_by

def get_turbo(self) -> bool:
return self.__turbo
Expand Down Expand Up @@ -136,6 +142,12 @@ def get_asynchronous(self) -> bool:
def get_apply_default_subscriptable_mapping(self) -> bool:
return self.__apply_default_subscriptable_mapping

def get_skip_transform_order_by(self) -> bool:
return self.__skip_transform_order_by

def set_skip_transform_order_by(self, value: bool) -> None:
self.__skip_transform_order_by = value

"""
Tiers indicate which storage tier to direct the query to. This is used by the TimeSeriesEndpoint and the TraceItemTableEndpoint
in EAP.
Expand Down Expand Up @@ -222,3 +234,6 @@ def get_asynchronous(self) -> bool:

def get_apply_default_subscriptable_mapping(self) -> bool:
return True

def get_skip_transform_order_by(self) -> bool:
return False
24 changes: 23 additions & 1 deletion snuba/web/rpc/common/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from datetime import datetime, timedelta, timezone
from typing import Callable, TypeVar, cast
from typing import Any, Callable, TypeVar, cast

from google.protobuf.message import Message as ProtobufMessage
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta
Expand Down Expand Up @@ -49,6 +50,27 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:
Tin = TypeVar("Tin", bound=ProtobufMessage)
Tout = TypeVar("Tout", bound=ProtobufMessage)

BUCKET_COUNT = 40


def transform_array_value(value: dict[str, str]) -> Any:
for t, v in value.items():
if t == "Int":
return int(v)
if t == "Double":
return float(v)
if t in {"String", "Bool"}:
return v
raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}")


def process_arrays(raw: str) -> dict[str, list[Any]]:
parsed = json.loads(raw) or {}
arrays = {}
for key, values in parsed.items():
arrays[key] = [transform_array_value(v) for v in values]
return arrays


def _check_non_string_values_cannot_ignore_case(
comparison_filter: ComparisonFilter,
Expand Down
Loading
Loading