Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 112 additions & 28 deletions snuba/web/rpc/v1/endpoint_get_trace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import random
import uuid
from datetime import datetime
Expand All @@ -12,7 +13,11 @@
GetTraceResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
Array,
AttributeKey,
AttributeValue,
)
from sentry_protos.snuba.v1.trace_item_filter_pb2 import (
AndFilter,
ComparisonFilter,
Expand Down Expand Up @@ -58,6 +63,8 @@
"project_id",
"trace_id",
"sampling_factor",
"attributes_bool",
"attributes_int",
]
APPLY_FINAL_ROLLOUT_PERCENTAGE_CONFIG_KEY = "EndpointGetTrace.apply_final_rollout_percentage"

Expand Down Expand Up @@ -204,21 +211,29 @@ def _build_query(
else:
selected_columns += [
SelectedExpression(
name=("attributes_string"),
name="attributes_string",
expression=FunctionCall(
("attributes_string"),
"mapConcat",
tuple(column(f"attributes_string_{i}") for i in range(40)),
),
),
SelectedExpression(
name=("attributes_float"),
name="attributes_float",
expression=FunctionCall(
("attributes_float"),
"mapConcat",
tuple(column(f"attributes_float_{i}") for i in range(40)),
),
),
SelectedExpression(
name="attributes_array",
expression=FunctionCall(
"attributes_array",
"toJSONString",
(column("attributes_array"),),
),
),
]
selected_columns.extend(
map(
Expand Down Expand Up @@ -363,46 +378,81 @@ def _build_snuba_request(
)


def convert_to_attribute_value(value: Any) -> AttributeValue:
print(value)
if isinstance(value, bool):
return AttributeValue(
val_bool=value,
)
elif isinstance(value, int):
return AttributeValue(
val_int=value,
)
elif isinstance(value, float):
return AttributeValue(
val_double=value,
)
elif isinstance(value, str):
return AttributeValue(
val_str=value,
)
elif isinstance(value, list):
return AttributeValue(
val_array=Array(values=[convert_to_attribute_value(v) for v in value])
)
elif isinstance(value, datetime):
return AttributeValue(
val_double=value.timestamp(),
)
else:
raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}")


def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeValue]:
if isinstance(value, int):
if isinstance(value, bool):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_INT,
type=AttributeKey.Type.TYPE_BOOLEAN,
),
AttributeValue(
val_int=value,
convert_to_attribute_value(value),
)
elif isinstance(value, int):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_INT,
),
convert_to_attribute_value(value),
)
elif isinstance(value, float):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_DOUBLE,
),
AttributeValue(
val_double=value,
),
convert_to_attribute_value(value),
)
elif isinstance(value, str):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_STRING,
),
AttributeValue(
val_str=value,
),
convert_to_attribute_value(value),
)
elif isinstance(value, list):
return (
AttributeKey(name=key, type=AttributeKey.Type.TYPE_ARRAY),
convert_to_attribute_value(value),
)
elif isinstance(value, datetime):
return (
AttributeKey(
name=key,
type=AttributeKey.Type.TYPE_DOUBLE,
),
AttributeValue(
val_double=value.timestamp(),
),
convert_to_attribute_value(value),
)
else:
raise BadSnubaRPCRequestException(f"data type unknown: {type(value)}")
Expand All @@ -418,6 +468,27 @@ def _value_to_attribute(key: str, value: Any) -> tuple[AttributeKey, AttributeVa
)


def _transform_array_value(value: dict[str, str]) -> Any:
for t, v in value.items():
if t == "Int":
return int(v)
if t == "Bool":
return bool(v)
if t == "Double":
return float(v)
if t == "String":
return v
raise BadSnubaRPCRequestException(f"array value type unknown: {type(v)}")


def _process_arrays(raw: str) -> dict[str, list[Any]]:
parsed = json.loads(raw)
arrays = {}
for key, values in parsed.items():
arrays[key] = [_transform_array_value(v) for v in values]
return arrays


def _process_results(
data: Iterable[Dict[str, Any]],
) -> ProcessedResults:
Expand All @@ -433,6 +504,11 @@ def _process_results(
for row in data:
id = row.pop("id")
ts = row.pop("timestamp")
arrays = row.pop("attributes_array", "{}")
# We want to merge these values after to overwrite potential floats
# with the same name.
booleans = row.pop("attributes_bool", {})
integers = row.pop("attributes_int", {})
last_seen_timestamp_precise = float(ts)
last_seen_id = id

Expand All @@ -441,29 +517,37 @@ def _process_results(
# then transform to nanoseconds
timestamp.FromNanoseconds(int(ts * 1e6) * 1000)

attributes: list[GetTraceResponse.Item.Attribute] = []
attributes: dict[str, GetTraceResponse.Item.Attribute] = {}

def add_attribute(key: str, value: Any) -> None:
attribute_key, attribute_value = _value_to_attribute(key, value)
attributes.append(
GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)
attributes[key] = GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)

for key, value in row.items():
if isinstance(value, dict):
for k, v in value.items():
add_attribute(k, v)
for row_key, row_value in row.items():
if isinstance(row_value, dict):
for column_key, column_value in row_value.items():
add_attribute(column_key, column_value)
else:
add_attribute(key, value)
add_attribute(row_key, row_value)

attributes_array = _process_arrays(arrays)
for array_key, array_value in attributes_array.items():
add_attribute(array_key, array_value)

for bool_key, bool_value in booleans.items():
add_attribute(bool_key, bool_value)

for int_key, int_value in integers.items():
add_attribute(int_key, int_value)

item = GetTraceResponse.Item(
id=id,
timestamp=timestamp,
attributes=sorted(
attributes,
attributes.values(),
key=attrgetter("key.name"),
),
)
Expand Down
12 changes: 12 additions & 0 deletions tests/datasets/configuration/test_storage_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

from snuba.clickhouse.columns import (
JSON,
Array,
Bool,
Column,
Expand Down Expand Up @@ -196,6 +197,13 @@ def test_column_parser(self) -> None:
"schema_modifiers": ["nullable"],
},
},
{
"name": "json_col",
"type": "JSON",
"args": {
"max_dynamic_paths": 128,
},
},
]

expected_python_columns = [
Expand All @@ -222,6 +230,10 @@ def test_column_parser(self) -> None:
SchemaModifiers(nullable=True, readonly=False),
),
),
Column(
"json_col",
JSON(max_dynamic_paths=128),
),
]

assert parse_columns(serialized_columns) == expected_python_columns
70 changes: 41 additions & 29 deletions tests/web/rpc/v1/test_endpoint_get_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
ResponseMeta,
TraceItemType,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
Array,
AttributeKey,
AttributeValue,
)
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from snuba import state
Expand Down Expand Up @@ -79,62 +83,70 @@
for i in range(10)
]


_PROTOBUF_TO_SENTRY_PROTOS = {
_PROTOBUF_TO_SENTRY_PROTOS: dict[str, tuple[str, AttributeKey.Type.ValueType]] = {
"string_value": ("val_str", AttributeKey.Type.TYPE_STRING),
"double_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE),
# we store integers as double
"int_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE),
# we store boolean as double
"bool_value": ("val_double", AttributeKey.Type.TYPE_DOUBLE),
"int_value": ("val_int", AttributeKey.Type.TYPE_INT),
"bool_value": ("val_bool", AttributeKey.Type.TYPE_BOOLEAN),
"array_value": ("val_array", AttributeKey.Type.TYPE_ARRAY),
}


def get_attributes(
span: TraceItem,
) -> list[GetTraceResponse.Item.Attribute]:
attributes: list[GetTraceResponse.Item.Attribute] = [
GetTraceResponse.Item.Attribute(
attributes: dict[str, GetTraceResponse.Item.Attribute] = {
"sampling_factor": GetTraceResponse.Item.Attribute(
key=AttributeKey(
name="sampling_factor",
type=AttributeKey.Type.TYPE_DOUBLE,
),
value=AttributeValue(val_double=1.0),
),
]
}

for key in {"organization_id", "project_id", "trace_id"}:
attribute_key, attribute_value = _value_to_attribute(key, getattr(span, key))
attributes.append(
GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)
attributes[key] = GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)
for key, value in span.attributes.items():

def _convert_to_attribute_value(value: AnyValue) -> AttributeValue:
value_type = value.WhichOneof("value")
if value_type:
attribute_key = AttributeKey(
name=key,
type=_PROTOBUF_TO_SENTRY_PROTOS[value_type][1],
)
args = {_PROTOBUF_TO_SENTRY_PROTOS[value_type][0]: getattr(value, value_type)}
arg_name = _PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][0]
arg_value = getattr(value, value_type)
if value_type == "array_value":
arg_value = Array(values=[_convert_to_attribute_value(v) for v in arg_value.values])
args = {arg_name: arg_value}
else:
continue
args = {"is_null": True}

attribute_value = AttributeValue(**args)
attributes.append(
GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)
return AttributeValue(**args)

for key, value in span.attributes.items():
value_type = value.WhichOneof("value")
if not value_type:
continue
attribute_key = AttributeKey(
name=key,
type=_PROTOBUF_TO_SENTRY_PROTOS[str(value_type)][1],
)
return attributes
attribute_value = _convert_to_attribute_value(value)
attributes[key] = GetTraceResponse.Item.Attribute(
key=attribute_key,
value=attribute_value,
)
return list(attributes.values())


@pytest.fixture(autouse=False)
def setup_teardown(clickhouse_db: None, redis_db: None) -> None:
state.set_config("eap_items_consumer_insert_arrays", "1")

items_storage = get_storage(StorageKey("eap_items"))

write_raw_unprocessed_events(items_storage, _SPANS) # type: ignore
write_raw_unprocessed_events(items_storage, _LOGS) # type: ignore

Expand Down
Loading
Loading