Skip to content

Commit 6d8de6d

Browse files
authored
feat: eap support formulas in timeseries endpoint (#6854)
this PR implements support for formulas in the timeseries endpoint. it closes this ticket getsentry/eap-planning#27 major changes: * auto-convert `TimeSeriesRequest.aggregations` to `TimeSeriesRequest.expressions` * implement support for formula tests: * I have a test for a non-extrapolated formula as well as an extrapolated one design decisions * reliability doesnt work with formulas * formulas dont work w uptime checks or logs
1 parent 635e54a commit 6d8de6d

File tree

6 files changed

+294
-30
lines changed

6 files changed

+294
-30
lines changed

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ python-rapidjson==1.8
2929
redis==4.5.4
3030
sentry-arroyo==2.19.12
3131
sentry-kafka-schemas==1.0.4
32-
sentry-protos==0.1.58
32+
sentry-protos==0.1.59
3333
sentry-redis-tools==0.3.0
3434
sentry-relay==0.9.5
3535
sentry-sdk==2.18.0

snuba/web/rpc/v1/endpoint_time_series.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Type
44

55
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
6+
Expression,
67
TimeSeriesRequest,
78
TimeSeriesResponse,
89
)
@@ -75,6 +76,19 @@ def _validate_time_buckets(request: TimeSeriesRequest) -> None:
7576
)
7677

7778

79+
def _convert_aggregations_to_expressions(
80+
request: TimeSeriesRequest,
81+
) -> TimeSeriesRequest:
82+
if len(request.aggregations) > 0:
83+
new_req = TimeSeriesRequest()
84+
new_req.CopyFrom(request)
85+
new_req.ClearField("aggregations")
86+
for agg in request.aggregations:
87+
new_req.expressions.append(Expression(aggregation=agg, label=agg.label))
88+
return new_req
89+
return request
90+
91+
7892
class EndpointTimeSeries(RPCEndpoint[TimeSeriesRequest, TimeSeriesResponse]):
7993
@classmethod
8094
def version(cls) -> str:
@@ -107,5 +121,6 @@ def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
107121
raise BadSnubaRPCRequestException(
108122
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
109123
)
124+
in_msg = _convert_aggregations_to_expressions(in_msg)
110125
resolver = self.get_resolver(in_msg.meta.trace_item_type)
111126
return resolver.resolve(in_msg)

snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import uuid
22
from collections import defaultdict
3+
from dataclasses import replace
34
from datetime import datetime
45
from typing import Any, Dict, Iterable
56

67
from google.protobuf.json_format import MessageToDict
78
from google.protobuf.timestamp_pb2 import Timestamp
9+
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import DataPoint
10+
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
11+
Expression as ProtoExpression,
12+
)
813
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
9-
DataPoint,
1014
TimeSeries,
1115
TimeSeriesRequest,
1216
TimeSeriesResponse,
@@ -23,6 +27,7 @@
2327
from snuba.query.data_source.simple import Entity
2428
from snuba.query.dsl import Functions as f
2529
from snuba.query.dsl import column
30+
from snuba.query.expressions import Expression
2631
from snuba.query.logical import Query
2732
from snuba.query.query_settings import HTTPQuerySettings
2833
from snuba.request import Request as SnubaRequest
@@ -48,6 +53,13 @@
4853
attribute_key_to_expression,
4954
)
5055

56+
OP_TO_EXPR = {
57+
ProtoExpression.BinaryFormula.OP_ADD: f.plus,
58+
ProtoExpression.BinaryFormula.OP_SUBTRACT: f.minus,
59+
ProtoExpression.BinaryFormula.OP_MULTIPLY: f.multiply,
60+
ProtoExpression.BinaryFormula.OP_DIVIDE: f.divide,
61+
}
62+
5163

5264
def _convert_result_timeseries(
5365
request: TimeSeriesRequest, data: list[Dict[str, Any]]
@@ -94,7 +106,8 @@ def _convert_result_timeseries(
94106

95107
# to convert the results, need to know which were the groupby columns and which ones
96108
# were aggregations
97-
aggregation_labels = set([agg.label for agg in request.aggregations])
109+
aggregation_labels = set([expr.label for expr in request.expressions])
110+
98111
group_by_labels = set([attr.name for attr in request.group_by])
99112

100113
# create a mapping with (all the group by attribute key,val pairs as strs, label name)
@@ -154,7 +167,7 @@ def _convert_result_timeseries(
154167
extrapolation_context = ExtrapolationContext.from_row(
155168
timeseries.label, row_data
156169
)
157-
if extrapolation_context.is_data_present:
170+
if row_data.get(timeseries.label, None) is not None:
158171
timeseries.data_points.append(
159172
DataPoint(
160173
data=row_data[timeseries.label],
@@ -169,23 +182,19 @@ def _convert_result_timeseries(
169182
return result_timeseries.values()
170183

171184

172-
def _build_query(request: TimeSeriesRequest) -> Query:
173-
# TODO: This is hardcoded still
174-
entity = Entity(
175-
key=EntityKey("eap_spans"),
176-
schema=get_entity(EntityKey("eap_spans")).get_data_model(),
177-
sample=None,
178-
)
185+
def _get_reliability_context_columns(
186+
expressions: Iterable[ProtoExpression],
187+
) -> list[SelectedExpression]:
188+
# this reliability logic ignores formulas, meaning formulas may not properly support reliability
189+
additional_context_columns = []
179190

180-
aggregation_columns = [
181-
SelectedExpression(
182-
name=aggregation.label, expression=aggregation_to_expression(aggregation)
183-
)
184-
for aggregation in request.aggregations
185-
]
191+
aggregates = []
192+
for e in expressions:
193+
if e.WhichOneof("expression") == "aggregation":
194+
# ignore formulas
195+
aggregates.append(e.aggregation)
186196

187-
additional_context_columns = []
188-
for aggregation in request.aggregations:
197+
for aggregation in aggregates:
189198
if (
190199
aggregation.extrapolation_mode
191200
== ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
@@ -211,6 +220,41 @@ def _build_query(request: TimeSeriesRequest) -> Query:
211220
additional_context_columns.append(
212221
SelectedExpression(name=count_column.alias, expression=count_column)
213222
)
223+
return additional_context_columns
224+
225+
226+
def _proto_expression_to_ast_expression(expr: ProtoExpression) -> Expression:
227+
match expr.WhichOneof("expression"):
228+
case "aggregation":
229+
return aggregation_to_expression(expr.aggregation)
230+
case "formula":
231+
formula_expr = OP_TO_EXPR[expr.formula.op](
232+
_proto_expression_to_ast_expression(expr.formula.left),
233+
_proto_expression_to_ast_expression(expr.formula.right),
234+
)
235+
formula_expr = replace(formula_expr, alias=expr.label)
236+
return formula_expr
237+
case default:
238+
raise ValueError(f"Unknown expression type: {default}")
239+
240+
241+
def _build_query(request: TimeSeriesRequest) -> Query:
242+
# TODO: This is hardcoded still
243+
entity = Entity(
244+
key=EntityKey("eap_spans"),
245+
schema=get_entity(EntityKey("eap_spans")).get_data_model(),
246+
sample=None,
247+
)
248+
249+
aggregation_columns = [
250+
SelectedExpression(
251+
name=expr.label,
252+
expression=_proto_expression_to_ast_expression(expr),
253+
)
254+
for expr in request.expressions
255+
]
256+
257+
additional_context_columns = _get_reliability_context_columns(request.expressions)
214258

215259
groupby_columns = [
216260
SelectedExpression(
@@ -302,6 +346,10 @@ def trace_item_type(cls) -> TraceItemType.ValueType:
302346
return TraceItemType.TRACE_ITEM_TYPE_SPAN
303347

304348
def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
349+
# aggregations field is deprecated, it gets converted to request.expressions
350+
# if the user passes it in
351+
assert len(in_msg.aggregations) == 0
352+
305353
snuba_request = _build_snuba_request(in_msg)
306354
res = run_query(
307355
dataset=PluggableDataset(name="eap", all_entities=[]),

snuba/web/rpc/v1/resolvers/R_uptime_checks/resolver_time_series.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from google.protobuf.timestamp_pb2 import Timestamp
88
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
99
DataPoint,
10+
Expression,
1011
TimeSeries,
1112
TimeSeriesRequest,
1213
TimeSeriesResponse,
@@ -31,6 +32,7 @@
3132
extract_response_meta,
3233
setup_trace_query_settings,
3334
)
35+
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
3436
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
3537
from snuba.web.rpc.v1.resolvers.common.aggregation import aggregation_to_expression
3638
from snuba.web.rpc.v1.resolvers.R_uptime_checks.common.common import (
@@ -40,6 +42,18 @@
4042
)
4143

4244

45+
def _get_aggregation_label(expr: Expression) -> str:
46+
match expr.WhichOneof("expression"):
47+
case "aggregation":
48+
return expr.aggregation.label
49+
case "formula":
50+
raise BadSnubaRPCRequestException(
51+
"formulas are not supported for uptime checks"
52+
)
53+
case default:
54+
raise BadSnubaRPCRequestException(f"Unknown expression type: {default}")
55+
56+
4357
def _convert_result_timeseries(
4458
request: TimeSeriesRequest, data: list[Dict[str, Any]]
4559
) -> Iterable[TimeSeries]:
@@ -85,7 +99,10 @@ def _convert_result_timeseries(
8599

86100
# to convert the results, need to know which were the groupby columns and which ones
87101
# were aggregations
88-
aggregation_labels = set([agg.label for agg in request.aggregations])
102+
aggregation_labels = set(
103+
[_get_aggregation_label(expr) for expr in request.expressions]
104+
)
105+
89106
group_by_labels = set([attr.name for attr in request.group_by])
90107

91108
# create a mapping with (all the group by attribute key,val pairs as strs, label name)
@@ -158,16 +175,25 @@ def _build_query(request: TimeSeriesRequest) -> Query:
158175
sample=None,
159176
)
160177

161-
aggregation_columns = [
162-
SelectedExpression(
163-
name=aggregation.label,
164-
expression=aggregation_to_expression(
165-
aggregation,
166-
attribute_key_to_expression(aggregation.key),
167-
),
168-
)
169-
for aggregation in request.aggregations
170-
]
178+
aggregation_columns = []
179+
for expr in request.expressions:
180+
match expr.WhichOneof("expression"):
181+
case "aggregation":
182+
aggregation_columns.append(
183+
SelectedExpression(
184+
name=expr.aggregation.label,
185+
expression=aggregation_to_expression(
186+
expr.aggregation,
187+
attribute_key_to_expression(expr.aggregation.key),
188+
),
189+
)
190+
)
191+
case "formula":
192+
raise BadSnubaRPCRequestException(
193+
"formulas are not supported for uptime checks"
194+
)
195+
case default:
196+
raise BadSnubaRPCRequestException(f"Unknown expression type: {default}")
171197

172198
groupby_columns = [
173199
SelectedExpression(

tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from google.protobuf.timestamp_pb2 import Timestamp
1010
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
1111
DataPoint,
12+
Expression,
1213
TimeSeries,
1314
TimeSeriesRequest,
1415
)
@@ -903,6 +904,98 @@ def test_OOM(self, monkeypatch: Any) -> None:
903904
sentry_sdk_mock.assert_called_once()
904905
assert metrics_mock.increment.call_args_list.count(call("OOM_query")) == 1
905906

907+
def test_formula(self) -> None:
908+
# store a a test metric with a value of 1, every second of one hour
909+
granularity_secs = 300
910+
query_duration = 60 * 30
911+
store_spans_timeseries(
912+
BASE_TIME,
913+
1,
914+
3600,
915+
metrics=[DummyMetric("test_metric", get_value=lambda x: 1)],
916+
)
917+
message = TimeSeriesRequest(
918+
meta=RequestMeta(
919+
project_ids=[1, 2, 3],
920+
organization_id=1,
921+
cogs_category="something",
922+
referrer="something",
923+
start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())),
924+
end_timestamp=Timestamp(
925+
seconds=int(BASE_TIME.timestamp() + query_duration)
926+
),
927+
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
928+
),
929+
expressions=[
930+
Expression(
931+
formula=Expression.BinaryFormula(
932+
op=Expression.BinaryFormula.OP_ADD,
933+
left=Expression(
934+
aggregation=AttributeAggregation(
935+
aggregate=Function.FUNCTION_SUM,
936+
key=AttributeKey(
937+
type=AttributeKey.TYPE_FLOAT, name="test_metric"
938+
),
939+
label="sum",
940+
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE,
941+
)
942+
),
943+
right=Expression(
944+
aggregation=AttributeAggregation(
945+
aggregate=Function.FUNCTION_AVG,
946+
key=AttributeKey(
947+
type=AttributeKey.TYPE_FLOAT, name="test_metric"
948+
),
949+
label="avg",
950+
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE,
951+
)
952+
),
953+
),
954+
label="sum + avg",
955+
),
956+
],
957+
granularity_secs=granularity_secs,
958+
)
959+
response = EndpointTimeSeries().execute(message)
960+
expected_buckets = [
961+
Timestamp(seconds=int(BASE_TIME.timestamp()) + secs)
962+
for secs in range(0, query_duration, granularity_secs)
963+
]
964+
expected_avg_timeseries = TimeSeries(
965+
label="avg",
966+
buckets=expected_buckets,
967+
data_points=[
968+
DataPoint(data=1, data_present=True, sample_count=300)
969+
for _ in range(len(expected_buckets))
970+
],
971+
)
972+
expected_sum_timeseries = TimeSeries(
973+
label="sum",
974+
buckets=expected_buckets,
975+
data_points=[
976+
DataPoint(data=300, data_present=True)
977+
for _ in range(len(expected_buckets))
978+
],
979+
)
980+
expected_formula_timeseries = TimeSeries(
981+
label="sum + avg",
982+
buckets=expected_buckets,
983+
data_points=[
984+
DataPoint(
985+
data=sum_datapoint.data + avg_datapoint.data,
986+
data_present=True,
987+
sample_count=sum_datapoint.sample_count,
988+
)
989+
for sum_datapoint, avg_datapoint in zip(
990+
expected_sum_timeseries.data_points,
991+
expected_avg_timeseries.data_points,
992+
)
993+
],
994+
)
995+
assert sorted(response.result_timeseries, key=lambda x: x.label) == [
996+
expected_formula_timeseries
997+
]
998+
906999

9071000
class TestUtils:
9081001
def test_no_duplicate_labels(self) -> None:

0 commit comments

Comments
 (0)