diff --git a/snuba/web/rpc/v1/endpoint_get_trace.py b/snuba/web/rpc/v1/endpoint_get_trace.py index 5269af2b016..1e87f3f686b 100644 --- a/snuba/web/rpc/v1/endpoint_get_trace.py +++ b/snuba/web/rpc/v1/endpoint_get_trace.py @@ -1,3 +1,4 @@ +import json import random import uuid from datetime import datetime @@ -12,7 +13,11 @@ GetTraceResponse, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + Array, + AttributeKey, + AttributeValue, +) from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( AndFilter, ComparisonFilter, @@ -58,6 +63,8 @@ "project_id", "trace_id", "sampling_factor", + "attributes_bool", + "attributes_int", ] APPLY_FINAL_ROLLOUT_PERCENTAGE_CONFIG_KEY = "EndpointGetTrace.apply_final_rollout_percentage" @@ -204,7 +211,7 @@ def _build_query( else: selected_columns += [ SelectedExpression( - name=("attributes_string"), + name="attributes_string", expression=FunctionCall( ("attributes_string"), "mapConcat", @@ -212,13 +219,21 @@ def _build_query( ), ), SelectedExpression( - name=("attributes_float"), + name="attributes_float", expression=FunctionCall( ("attributes_float"), "mapConcat", tuple(column(f"attributes_float_{i}") for i in range(40)), ), ), + SelectedExpression( + name="attributes_array", + expression=FunctionCall( + "attributes_array", + "toJSONString", + (column("attributes_array"),), + ), + ), ] selected_columns.extend( map( @@ -363,16 +378,52 @@ def _build_snuba_request( ) +def convert_to_attribute_value(value: Any) -> AttributeValue: + print(value) + if isinstance(value, bool): + return AttributeValue( + val_bool=value, + ) + elif isinstance(value, int): + return AttributeValue( + val_int=value, + ) + elif isinstance(value, float): + return AttributeValue( + val_double=value, + ) + elif isinstance(value, str): + return AttributeValue( + val_str=value, + ) + elif isinstance(value, list): + return AttributeValue( + val_array=Array(values=[convert_to_attribute_value(v) for v in value]) + ) + elif isinstance(value, datetime): + return AttributeValue( + val_double=value.timestamp(), + ) + else: + raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}") + + def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeValue]: - if isinstance(value, int): + if isinstance(value, bool): return ( AttributeKey( name=key, - type=AttributeKey.Type.TYPE_INT, + type=AttributeKey.Type.TYPE_BOOLEAN, ), - AttributeValue( - val_int=value, + convert_to_attribute_value(value), + ) + elif isinstance(value, int): + return ( + AttributeKey( + name=key, + type=AttributeKey.Type.TYPE_INT, ), + convert_to_attribute_value(value), ) elif isinstance(value, float): return ( @@ -380,9 +431,7 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_DOUBLE, ), - AttributeValue( - val_double=value, - ), + convert_to_attribute_value(value), ) elif isinstance(value, str): return ( @@ -390,9 +439,12 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_STRING, ), - AttributeValue( - val_str=value, - ), + convert_to_attribute_value(value), + ) + elif isinstance(value, list): + return ( + AttributeKey(name=key, type=AttributeKey.Type.TYPE_ARRAY), + convert_to_attribute_value(value), ) elif isinstance(value, datetime): return ( @@ -400,9 +452,7 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa name=key, type=AttributeKey.Type.TYPE_DOUBLE, ), - AttributeValue( - val_double=value.timestamp(), - ), + convert_to_attribute_value(value), ) else: raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}") @@ -418,6 +468,27 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa ) +def _transform_array_value(value: dict[str, str]) -> Any: + for t, v in value.items(): + if t == "Int": + return int(v) + if t == "Bool": + return bool(v) + if t == "Double": + return float(v) + if t == "String": + return v + raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}") + + +def _process_arrays(raw: str) -> dict[str, list[Any]]: + parsed = json.loads(raw) + arrays = {} + for key, values in parsed.items(): + arrays[key] = [_transform_array_value(v) for v in values] + return arrays + + def _process_results( data: Iterable[Dict[str, Any]], ) -> ProcessedResults: @@ -433,6 +504,11 @@ def _process_results( for row in data: id = row.pop("id") ts = row.pop("timestamp") + arrays = row.pop("attributes_array", "{}") + # We want to merge these values after to overwrite potential floats + # with the same name. + booleans = row.pop("attributes_bool", {}) + integers = row.pop("attributes_int", {}) last_seen_timestamp_precise = float(ts) last_seen_id = id @@ -441,29 +517,37 @@ def _process_results( # then transform to nanoseconds timestamp.FromNanoseconds(int(ts * 1e6) * 1000) - attributes: list[GetTraceResponse.Item.Attribute] = [] + attributes: dict[str, GetTraceResponse.Item.Attribute] = {} def add_attribute(key: str, value: Any) -> None: attribute_key, attribute_value = _value_to_attribute(key, value) - attributes.append( - GetTraceResponse.Item.Attribute( - key=attribute_key, - value=attribute_value, - ) + attributes[key] = GetTraceResponse.Item.Attribute( + key=attribute_key, + value=attribute_value, ) - for key, value in row.items(): - if isinstance(value, dict): - for k, v in value.items(): - add_attribute(k, v) + for row_key, row_value in row.items(): + if isinstance(row_value, dict): + for column_key, column_value in row_value.items(): + add_attribute(column_key, column_value) else: - add_attribute(key, value) + add_attribute(row_key, row_value) + + attributes_array = _process_arrays(arrays) + for array_key, array_value in attributes_array.items(): + add_attribute(array_key, array_value) + + for bool_key, bool_value in booleans.items(): + add_attribute(bool_key, bool_value) + + for int_key, int_value in integers.items(): + add_attribute(int_key, int_value) item = GetTraceResponse.Item( id=id, timestamp=timestamp, attributes=sorted( - attributes, + attributes.values(), key=attrgetter("key.name"), ), ) diff --git a/tests/datasets/configuration/test_storage_loader.py b/tests/datasets/configuration/test_storage_loader.py index d7d88588348..28afffd0f97 100644 --- a/tests/datasets/configuration/test_storage_loader.py +++ b/tests/datasets/configuration/test_storage_loader.py @@ -5,6 +5,7 @@ from typing import Any from snuba.clickhouse.columns import ( + JSON, Array, Bool, Column, @@ -196,6 +197,13 @@ def test_column_parser(self) -> None: "schema_modifiers": ["nullable"], }, }, + { + "name": "json_col", + "type": "JSON", + "args": { + "max_dynamic_paths": 128, + }, + }, ] expected_python_columns = [ @@ -222,6 +230,10 @@ def test_column_parser(self) -> None: SchemaModifiers(nullable=True, readonly=False), ), ), + Column( + "json_col", + JSON(max_dynamic_paths=128), + ), ] assert parse_columns(serialized_columns) == expected_python_columns diff --git a/tests/web/rpc/v1/test_endpoint_get_trace.py b/tests/web/rpc/v1/test_endpoint_get_trace.py index 788b6849abd..530fc688515 100644 --- a/tests/web/rpc/v1/test_endpoint_get_trace.py +++ b/tests/web/rpc/v1/test_endpoint_get_trace.py @@ -21,7 +21,11 @@ ResponseMeta, TraceItemType, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + Array, + AttributeKey, + AttributeValue, +) from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem from snuba import state @@ -79,62 +83,70 @@ for i in range(10) ] - -_PROTOBUF_TO_SENTRY_PROTOS = { +_PROTOBUF_TO_SENTRY_PROTOS: dict[str, tuple[str, AttributeKey.Type.ValueType]] = { "string_value": ("val_str", AttributeKey.Type.TYPE_STRING), "double_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), - # we store integers as double - "int_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), - # we store boolean as double - "bool_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE), + "int_value": ("val_int", AttributeKey.Type.TYPE_INT), + "bool_value": ("val_bool", AttributeKey.Type.TYPE_BOOLEAN), + "array_value": ("val_array", AttributeKey.Type.TYPE_ARRAY), } def get_attributes( span: TraceItem, ) -> list[GetTraceResponse.Item.Attribute]: - attributes: list[GetTraceResponse.Item.Attribute] = [ - GetTraceResponse.Item.Attribute( + attributes: dict[str, GetTraceResponse.Item.Attribute] = { + "sampling_factor": GetTraceResponse.Item.Attribute( key=AttributeKey( name="sampling_factor", type=AttributeKey.Type.TYPE_DOUBLE, ), value=AttributeValue(val_double=1.0), ), - ] + } for key in {"organization_id", "project_id", "trace_id"}: attribute_key, attribute_value = _value_to_attribute(key, getattr(span, key)) - attributes.append( - GetTraceResponse.Item.Attribute( - key=attribute_key, - value=attribute_value, - ) + attributes[key] = GetTraceResponse.Item.Attribute( + key=attribute_key, + value=attribute_value, ) - for key, value in span.attributes.items(): + + def _convert_to_attribute_value(value: AnyValue) -> AttributeValue: value_type = value.WhichOneof("value") if value_type: - attribute_key = AttributeKey( - name=key, - type=_PROTOBUF_TO_SENTRY_PROTOS[value_type][1], - ) - args = {_PROTOBUF_TO_SENTRY_PROTOS[value_type][0]: getattr(value, value_type)} + arg_name = _PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][0] + arg_value = getattr(value, value_type) + if value_type == "array_value": + arg_value = Array(values=[_convert_to_attribute_value(v) for v in arg_value.values]) + args = {arg_name: arg_value} else: - continue + args = {"is_null": True} - attribute_value = AttributeValue(**args) - attributes.append( - GetTraceResponse.Item.Attribute( - key=attribute_key, - value=attribute_value, - ) + return AttributeValue(**args) + + for key, value in span.attributes.items(): + value_type = value.WhichOneof("value") + if not value_type: + continue + attribute_key = AttributeKey( + name=key, + type=_PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][1], ) - return attributes + attribute_value = _convert_to_attribute_value(value) + attributes[key] = GetTraceResponse.Item.Attribute( + key=attribute_key, + value=attribute_value, + ) + return list(attributes.values()) @pytest.fixture(autouse=False) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: + state.set_config("eap_items_consumer_insert_arrays", "1") + items_storage = get_storage(StorageKey("eap_items")) + write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore diff --git a/tests/web/rpc/v1/test_utils.py b/tests/web/rpc/v1/test_utils.py index 29b65f372ee..0647e795313 100644 --- a/tests/web/rpc/v1/test_utils.py +++ b/tests/web/rpc/v1/test_utils.py @@ -11,7 +11,7 @@ OrFilter, TraceItemFilter, ) -from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, TraceItem from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey @@ -69,6 +69,16 @@ "transaction.method": AnyValue(string_value="POST"), "transaction.op": AnyValue(string_value="http.server"), "user": AnyValue(string_value="ip:127.0.0.1"), + "i_am_an_array": AnyValue( + array_value=ArrayValue( + values=[ + AnyValue(int_value=1), + AnyValue(bool_value=True), + AnyValue(double_value=3.0), + AnyValue(string_value="blah"), + ] + ) + ), } @@ -89,16 +99,24 @@ def write_eap_item( count: the number of these spans to write. """ - attributes: dict[str, AnyValue] = {} - for key, value in raw_attributes.items(): + def convert_attribute_value(value: Any) -> AnyValue: if isinstance(value, str): - attributes[key] = AnyValue(string_value=value) + return AnyValue(string_value=value) elif isinstance(value, int): - attributes[key] = AnyValue(int_value=value) + return AnyValue(int_value=value) elif isinstance(value, bool): - attributes[key] = AnyValue(bool_value=value) - else: - attributes[key] = AnyValue(double_value=value) + return AnyValue(bool_value=value) + elif isinstance(value, float): + return AnyValue(double_value=value) + elif isinstance(value, list): + return AnyValue( + array_value=ArrayValue(values=[convert_attribute_value(v) for v in value]) + ) + return AnyValue() + + attributes: dict[str, AnyValue] = {} + for key, value in raw_attributes.items(): + attributes[key] = convert_attribute_value(value) write_raw_unprocessed_events( get_storage(StorageKey("eap_items")), # type: ignore