Skip to content

Commit 9654ebf

Browse files
authored
feat(query-pipeline): Add configs to customize maximum size of a query (#7546)
1 parent 09d25c2 commit 9654ebf

File tree

3 files changed

+97
-8
lines changed

3 files changed

+97
-8
lines changed

snuba/pipeline/stages/query_execution.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from snuba import environment
1313
from snuba import settings as snuba_settings
14+
from snuba import state
1415
from snuba.attribution.attribution_info import AttributionInfo
1516
from snuba.clickhouse.formatter.query import format_query
1617
from snuba.clickhouse.query import Query as ClickhouseQuery
@@ -45,6 +46,11 @@
4546
metrics = MetricsWrapper(environment.metrics, "api")
4647
logger = logging.getLogger("snuba.pipeline.stages.query_execution")
4748

49+
DISABLE_MAX_QUERY_SIZE_CHECK_FOR_CLUSTERS_CONFIG = (
50+
"ExecutionStage.disable_max_query_size_check_for_clusters"
51+
)
52+
MAX_QUERY_SIZE_BYTES_CONFIG = "ExecutionStage.max_query_size_bytes"
53+
4854

4955
class ExecutionStage(QueryPipelineStage[ClickhouseQuery | CompositeQuery[Table], QueryResult]):
5056
def __init__(
@@ -157,6 +163,21 @@ def _run_and_apply_column_names(
157163
return result
158164

159165

166+
def _max_query_size_bytes() -> int:
167+
return (
168+
state.get_int_config(MAX_QUERY_SIZE_BYTES_CONFIG, MAX_QUERY_SIZE_BYTES)
169+
or MAX_QUERY_SIZE_BYTES
170+
)
171+
172+
173+
def _disable_max_query_size_check_for_clusters() -> set[str]:
174+
return set(
175+
(state.get_str_config(DISABLE_MAX_QUERY_SIZE_CHECK_FOR_CLUSTERS_CONFIG, "") or "").split(
176+
","
177+
)
178+
)
179+
180+
160181
def _format_storage_query_and_run(
161182
timer: Timer,
162183
query_metadata: SnubaQueryMetadata,
@@ -207,11 +228,16 @@ def _format_storage_query_and_run(
207228
"cluster_name": cluster_name,
208229
}
209230

210-
if query_size_bytes > MAX_QUERY_SIZE_BYTES:
231+
if (
232+
not cluster_name
233+
or
234+
# This will force to fallback on the ClickHouse limit.
235+
cluster_name not in _disable_max_query_size_check_for_clusters()
236+
) and query_size_bytes > _max_query_size_bytes():
211237
cause = QueryTooLongException(
212238
f"After processing, query is {query_size_bytes} bytes, "
213239
"which is too long for ClickHouse to process. "
214-
f"Max size is {MAX_QUERY_SIZE_BYTES} bytes."
240+
f"Max size is {_max_query_size_bytes()} bytes."
215241
)
216242
stats = update_query_metadata_and_stats(
217243
query=clickhouse_query,
@@ -273,10 +299,10 @@ def get_query_size_group(query_size_bytes: int) -> str:
273299
Eg. If the query size is equal to the max query size, this function
274300
returns "100%".
275301
"""
276-
if query_size_bytes == MAX_QUERY_SIZE_BYTES:
302+
if query_size_bytes == _max_query_size_bytes():
277303
return "100%"
278304
else:
279-
query_size_group = int(floor(query_size_bytes / MAX_QUERY_SIZE_BYTES * 10)) * 10
305+
query_size_group = int(floor(query_size_bytes / _max_query_size_bytes() * 10)) * 10
280306
return f">={query_size_group}%"
281307

282308

snuba/settings/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
"profile_chunks",
120120
},
121121
"single_node": True,
122+
"cluster_name": "test_cluster",
122123
},
123124
]
124125

tests/pipeline/test_execution_stage.py

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pytest
44

55
from snuba import settings as snubasettings
6+
from snuba import state
67
from snuba.attribution import get_app_id
78
from snuba.attribution.attribution_info import AttributionInfo
89
from snuba.clickhouse.columns import ColumnSet
@@ -11,7 +12,11 @@
1112
from snuba.datasets.entities.entity_key import EntityKey
1213
from snuba.datasets.storages.storage_key import StorageKey
1314
from snuba.pipeline.query_pipeline import QueryPipelineResult
14-
from snuba.pipeline.stages.query_execution import ExecutionStage
15+
from snuba.pipeline.stages.query_execution import (
16+
DISABLE_MAX_QUERY_SIZE_CHECK_FOR_CLUSTERS_CONFIG,
17+
MAX_QUERY_SIZE_BYTES_CONFIG,
18+
ExecutionStage,
19+
)
1520
from snuba.query import SelectedExpression
1621
from snuba.query.allocation_policies import (
1722
MAX_THRESHOLD,
@@ -35,6 +40,7 @@
3540
from snuba.request import Request
3641
from snuba.utils.metrics.timer import Timer
3742
from snuba.utils.schemas import UUID, String, UInt
43+
from snuba.web import QueryException
3844

3945

4046
class MockAllocationPolicy(AllocationPolicy):
@@ -72,9 +78,7 @@ def get_fake_metadata() -> SnubaQueryMetadata:
7278
Request(
7379
uuid.uuid4(),
7480
{},
75-
LogicalQuery(
76-
from_clause=Entity(key=EntityKey.TRANSACTIONS, schema=ColumnSet([]))
77-
),
81+
LogicalQuery(from_clause=Entity(key=EntityKey.TRANSACTIONS, schema=ColumnSet([]))),
7882
HTTPQuerySettings(),
7983
AttributionInfo(
8084
get_app_id("blah"),
@@ -220,3 +224,61 @@ def test_turbo(ch_query: Query) -> None:
220224
and "avg(duration)" in res.data.result["data"][0]
221225
)
222226
assert ch_query.get_from_clause().sampling_rate == snubasettings.TURBO_SAMPLE_RATE
227+
228+
229+
@pytest.mark.clickhouse_db
230+
@pytest.mark.redis_db
231+
def test_max_query_size_bytes(ch_query: Query) -> None:
232+
attinfo = AttributionInfo(
233+
get_app_id("blah"), {"tenant_type": "tenant_id"}, "blah", None, None, None
234+
)
235+
settings = HTTPQuerySettings()
236+
timer = Timer("test")
237+
metadata = get_fake_metadata()
238+
239+
state.set_config(MAX_QUERY_SIZE_BYTES_CONFIG, 1)
240+
241+
res = ExecutionStage(attinfo, query_metadata=metadata).execute(
242+
QueryPipelineResult(
243+
data=ch_query,
244+
query_settings=settings,
245+
timer=timer,
246+
error=None,
247+
)
248+
)
249+
250+
assert res.data is None
251+
assert isinstance(res.error, QueryException)
252+
assert "which is too long for ClickHouse to process" in res.error.message
253+
254+
255+
@pytest.mark.clickhouse_db
256+
@pytest.mark.redis_db
257+
def test_disable_max_query_size_check(ch_query: Query) -> None:
258+
attinfo = AttributionInfo(
259+
get_app_id("blah"), {"tenant_type": "tenant_id"}, "blah", None, None, None
260+
)
261+
settings = HTTPQuerySettings()
262+
timer = Timer("test")
263+
metadata = get_fake_metadata()
264+
cluster_name = (
265+
snubasettings.CLUSTERS[0]["cluster_name"]
266+
if "cluster_name" in snubasettings.CLUSTERS[0]
267+
else "test_cluster"
268+
)
269+
270+
# Lowering this should make the query too big...
271+
state.set_config(MAX_QUERY_SIZE_BYTES_CONFIG, 1)
272+
# Unless we disable the check for this cluster.
273+
state.set_config(DISABLE_MAX_QUERY_SIZE_CHECK_FOR_CLUSTERS_CONFIG, cluster_name)
274+
275+
res = ExecutionStage(attinfo, query_metadata=metadata).execute(
276+
QueryPipelineResult(
277+
data=ch_query,
278+
query_settings=settings,
279+
timer=timer,
280+
error=None,
281+
)
282+
)
283+
284+
assert res.data

0 commit comments

Comments
 (0)