1+ import re
12from collections import defaultdict
23from typing import Any , Callable , Dict , Iterable
34
45from sentry_protos .snuba .v1 .endpoint_trace_item_table_pb2 import (
6+ Column ,
57 TraceItemColumnValues ,
68 TraceItemTableRequest ,
79)
8- from sentry_protos .snuba .v1 .trace_item_attribute_pb2 import AttributeKey , AttributeValue
10+ from sentry_protos .snuba .v1 .trace_item_attribute_pb2 import (
11+ AttributeKey ,
12+ AttributeValue ,
13+ Reliability ,
14+ )
915
16+ from snuba .settings import ENABLE_FORMULA_RELIABILITY_DEFAULT
17+ from snuba .state import get_int_config
1018from snuba .web .rpc .common .exceptions import BadSnubaRPCRequestException
1119from snuba .web .rpc .v1 .resolvers .common .aggregation import ExtrapolationContext
1220
1321
14- def convert_results (
15- request : TraceItemTableRequest , data : Iterable [Dict [str , Any ]]
16- ) -> list [TraceItemColumnValues ]:
17- converters : Dict [str , Callable [[Any ], AttributeValue ]] = {}
18-
19- for column in request .columns :
20- if column .HasField ("key" ):
21- if column .key .type == AttributeKey .TYPE_BOOLEAN :
22- converters [column .label ] = lambda x : AttributeValue (val_bool = bool (x ))
23- elif column .key .type == AttributeKey .TYPE_STRING :
24- converters [column .label ] = lambda x : AttributeValue (val_str = str (x ))
25- elif column .key .type == AttributeKey .TYPE_INT :
26- converters [column .label ] = lambda x : AttributeValue (val_int = int (x ))
27- elif column .key .type == AttributeKey .TYPE_FLOAT :
28- converters [column .label ] = lambda x : AttributeValue (val_float = float (x ))
29- elif column .key .type == AttributeKey .TYPE_DOUBLE :
30- converters [column .label ] = lambda x : AttributeValue (val_double = float (x ))
31- elif column .HasField ("conditional_aggregation" ):
32- converters [column .label ] = lambda x : AttributeValue (val_double = float (x ))
33- elif column .HasField ("formula" ):
22+ def _add_converter (
23+ column : Column , converters : Dict [str , Callable [[Any ], AttributeValue ]]
24+ ) -> None :
25+ if column .HasField ("key" ):
26+ if column .key .type == AttributeKey .TYPE_BOOLEAN :
27+ converters [column .label ] = lambda x : AttributeValue (val_bool = bool (x ))
28+ elif column .key .type == AttributeKey .TYPE_STRING :
29+ converters [column .label ] = lambda x : AttributeValue (val_str = str (x ))
30+ elif column .key .type == AttributeKey .TYPE_INT :
31+ converters [column .label ] = lambda x : AttributeValue (val_int = int (x ))
32+ elif column .key .type == AttributeKey .TYPE_FLOAT :
33+ converters [column .label ] = lambda x : AttributeValue (val_float = float (x ))
34+ elif column .key .type == AttributeKey .TYPE_DOUBLE :
3435 converters [column .label ] = lambda x : AttributeValue (val_double = float (x ))
3536 else :
3637 raise BadSnubaRPCRequestException (
37- "column is not one of: attribute, (conditional) aggregation, or formula "
38+ f"unknown attribute type: { AttributeKey . Type . Name ( column . key . type ) } "
3839 )
40+ elif column .HasField ("conditional_aggregation" ):
41+ converters [column .label ] = lambda x : AttributeValue (val_double = float (x ))
42+ elif column .HasField ("formula" ):
43+ converters [column .label ] = lambda x : AttributeValue (val_double = float (x ))
44+ if get_int_config (
45+ "enable_formula_reliability" , ENABLE_FORMULA_RELIABILITY_DEFAULT
46+ ):
47+ _add_converter (column .formula .left , converters )
48+ _add_converter (column .formula .right , converters )
49+ elif column .HasField ("literal" ):
50+ converters [column .label ] = lambda x : AttributeValue (val_double = float (x ))
51+ else :
52+ raise BadSnubaRPCRequestException (
53+ "column is not one of: attribute, (conditional) aggregation, or formula"
54+ )
55+
56+
57+ def get_converters_for_columns (
58+ columns : Iterable [Column ],
59+ ) -> Dict [str , Callable [[Any ], AttributeValue ]]:
60+ """
61+ Returns a dictionary of column labels to their corresponding converters.
62+ Converters are functions that convert a value returned by a clickhouse query to an AttributeValue.
63+ """
64+ converters : Dict [str , Callable [[Any ], AttributeValue ]] = {}
65+ for column in columns :
66+ _add_converter (column , converters )
67+ return converters
68+
69+
70+ def _is_sub_column (result_column_name : str , column : Column ) -> bool :
71+ """
72+ returns true if result_column_name is a sub column of column. false otherwise.
73+ """
74+ # this logic could theoretically cause issue if the user passes in such a column label to a non-subcolumn.
75+ # for now, we assume that the user will not do this.
76+ return bool (
77+ re .fullmatch (rf"{ re .escape (column .label )} (\.left|\.right)+" , result_column_name )
78+ )
79+
80+
81+ def _get_reliabilities_for_formula (
82+ column : Column , res : Dict [str , TraceItemColumnValues ]
83+ ) -> list [Reliability .ValueType ]:
84+ """
85+ Compute and return the reliabilities for the given formula column,
86+ based on the reliabilities of the left and right parts.
87+
88+ Ex:
89+ When users send a request with a formula such as sum(B)/min(B)
90+ we also separately query for sum(B), min(B) separately (earlier in the codebase).
91+ Thus, we already have the reliabilities for sum(B), min(B) in res labels as .left and .right.
92+ We use them in this function to compute the reliability of the formula, based on the following:
93+ a formula is reliable iff all of its parts are reliable (.left and .right)
94+ ex: (agg1 + agg2) / agg3 * agg4 is reliable iff agg1, agg2, agg3, agg4 are reliable.
95+
96+ Select A, sum(B)/min(B) AS agg GROUP BY A
97+ +----+--------------+----------+--------------+
98+ | A | agg | agg.left | agg.right |
99+ +----+--------------+----------+--------------+
100+ | A1 | reliable | reliable | reliable |
101+ | A2 | not reliable | reliable | not reliable |
102+ | A3 | reliable | reliable | reliable |
103+ +----+--------------+----------+--------------+
104+ you can see that each column has a reliability for each group by. and the reliabilities of agg is determined
105+ based on the reliabilities of agg.left and agg.right. In this case the function would return
106+ [reliable, not reliable, reliable]
107+ """
108+
109+ reliable_so_far : list [Reliability .ValueType ] = []
110+ for resname , resvalue in res .items ():
111+ if _is_sub_column (resname , column ):
112+ for i , reliability in enumerate (resvalue .reliabilities ):
113+ if len (reliable_so_far ) <= i :
114+ # bc we are extending as we go, it should only ever be 1 behind
115+ assert i == len (reliable_so_far )
116+ reliable_so_far .append (reliability )
117+ else :
118+ if reliability not in [
119+ Reliability .RELIABILITY_UNSPECIFIED ,
120+ Reliability .RELIABILITY_LOW ,
121+ Reliability .RELIABILITY_HIGH ,
122+ ]:
123+ raise ValueError (f"Invalid reliability: { reliability } " )
124+ reliable_so_far [i ] = min (reliable_so_far [i ], reliability )
125+ return reliable_so_far
126+
127+
128+ def convert_results (
129+ request : TraceItemTableRequest , data : Iterable [Dict [str , Any ]]
130+ ) -> list [TraceItemColumnValues ]:
131+ converters = get_converters_for_columns (request .columns )
39132
40133 res : defaultdict [str , TraceItemColumnValues ] = defaultdict (TraceItemColumnValues )
41134 for row in data :
@@ -53,6 +146,25 @@ def convert_results(
53146 extrapolation_context .reliability
54147 )
55148
149+ if get_int_config ("enable_formula_reliability" , ENABLE_FORMULA_RELIABILITY_DEFAULT ):
150+ # add formula reliabilities, remove the left and right parts
151+ for column in request .columns :
152+ if column .HasField ("formula" ) and column .label in res :
153+ # compute the reliabilities for the formula
154+ reliabilities = _get_reliabilities_for_formula (column , res )
155+ # set the reliabilities of the formula to be the ones we calculated
156+ while len (res [column .label ].reliabilities ) > 0 :
157+ res [column .label ].reliabilities .pop ()
158+ for e in reliabilities :
159+ assert e is not None
160+ res [column .label ].reliabilities .append (e )
161+
162+ # remove any columns that were not explicitly requested by the user in the request
163+ requested_column_labels = set (e .label for e in request .columns )
164+ to_delete = list (filter (lambda k : k not in requested_column_labels , res .keys ()))
165+ for name in to_delete :
166+ del res [name ]
167+
56168 column_ordering = {column .label : i for i , column in enumerate (request .columns )}
57169
58170 return list (
0 commit comments