Skip to content

Commit 818962c

Browse files
zhaorui2022Rui Zhao
andauthored
fix(embedded): Retry when executing alert queries to avoid sending transient errors to users as alert failure notifications (apache#20419)
Co-authored-by: Rui Zhao <zhaorui@dropbox.com>
1 parent 662bab1 commit 818962c

File tree

4 files changed

+132
-1
lines changed

4 files changed

+132
-1
lines changed

superset/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,6 +1078,9 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument
10781078
# If set to true no notification is sent, the worker will just log a message.
10791079
# Useful for debugging
10801080
ALERT_REPORTS_NOTIFICATION_DRY_RUN = False
1081+
# Max tries to run queries to prevent false errors caused by transient errors
1082+
# being returned to users. Set to a value >1 to enable retries.
1083+
ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES = 1
10811084

10821085
# A custom prefix to use on all Alerts & Reports emails
10831086
EMAIL_REPORTS_SUBJECT_PREFIX = "[Report] "

superset/reports/commands/alert.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
AlertValidatorConfigError,
3838
)
3939
from superset.utils.core import override_user
40+
from superset.utils.retries import retry_call
4041

4142
logger = logging.getLogger(__name__)
4243

@@ -171,7 +172,13 @@ def validate(self) -> None:
171172
"""
172173
Validate the query result as a Pandas DataFrame
173174
"""
174-
df = self._execute_query()
175+
# When there are transient errors when executing queries, users will get
176+
# notified with the error stacktrace which can be avoided by retrying
177+
df = retry_call(
178+
self._execute_query,
179+
exception=AlertQueryError,
180+
max_tries=app.config["ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES"],
181+
)
175182

176183
if df.empty and self._is_validator_not_null:
177184
self._result = None
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
# pylint: disable=invalid-name, unused-argument, import-outside-toplevel
18+
19+
import pandas as pd
20+
from pytest_mock import MockFixture
21+
22+
23+
def test_execute_query_succeeded_no_retry(
24+
mocker: MockFixture, app_context: None
25+
) -> None:
26+
27+
from superset.reports.commands.alert import AlertCommand
28+
29+
execute_query_mock = mocker.patch(
30+
"superset.reports.commands.alert.AlertCommand._execute_query",
31+
side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
32+
)
33+
34+
command = AlertCommand(report_schedule=mocker.Mock())
35+
36+
command.validate()
37+
38+
assert execute_query_mock.call_count == 1
39+
40+
41+
def test_execute_query_succeeded_with_retries(
42+
mocker: MockFixture, app_context: None
43+
) -> None:
44+
from superset.reports.commands.alert import AlertCommand, AlertQueryError
45+
46+
execute_query_mock = mocker.patch(
47+
"superset.reports.commands.alert.AlertCommand._execute_query"
48+
)
49+
50+
query_executed_count = 0
51+
# Should match the value defined in superset_test_config.py
52+
expected_max_retries = 3
53+
54+
def _mocked_execute_query() -> pd.DataFrame:
55+
nonlocal query_executed_count
56+
query_executed_count += 1
57+
58+
if query_executed_count < expected_max_retries:
59+
raise AlertQueryError()
60+
else:
61+
return pd.DataFrame([{"sample_col": 0}])
62+
63+
execute_query_mock.side_effect = _mocked_execute_query
64+
execute_query_mock.__name__ = "mocked_execute_query"
65+
66+
command = AlertCommand(report_schedule=mocker.Mock())
67+
68+
command.validate()
69+
70+
assert execute_query_mock.call_count == expected_max_retries
71+
72+
73+
def test_execute_query_failed_no_retry(mocker: MockFixture, app_context: None) -> None:
74+
from superset.reports.commands.alert import AlertCommand, AlertQueryTimeout
75+
76+
execute_query_mock = mocker.patch(
77+
"superset.reports.commands.alert.AlertCommand._execute_query"
78+
)
79+
80+
def _mocked_execute_query() -> None:
81+
raise AlertQueryTimeout
82+
83+
execute_query_mock.side_effect = _mocked_execute_query
84+
execute_query_mock.__name__ = "mocked_execute_query"
85+
86+
command = AlertCommand(report_schedule=mocker.Mock())
87+
88+
try:
89+
command.validate()
90+
except AlertQueryTimeout:
91+
pass
92+
93+
assert execute_query_mock.call_count == 1
94+
95+
96+
def test_execute_query_failed_max_retries(
97+
mocker: MockFixture, app_context: None
98+
) -> None:
99+
from superset.reports.commands.alert import AlertCommand, AlertQueryError
100+
101+
execute_query_mock = mocker.patch(
102+
"superset.reports.commands.alert.AlertCommand._execute_query"
103+
)
104+
105+
def _mocked_execute_query() -> None:
106+
raise AlertQueryError
107+
108+
execute_query_mock.side_effect = _mocked_execute_query
109+
execute_query_mock.__name__ = "mocked_execute_query"
110+
111+
command = AlertCommand(report_schedule=mocker.Mock())
112+
113+
try:
114+
command.validate()
115+
except AlertQueryError:
116+
pass
117+
118+
# Should match the value defined in superset_test_config.py
119+
assert execute_query_mock.call_count == 3

tests/integration_tests/superset_test_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ def GET_FEATURE_FLAGS_FUNC(ff):
115115

116116
ALERT_REPORTS_WORKING_TIME_OUT_KILL = True
117117

118+
ALERT_REPORTS_QUERY_EXECUTION_MAX_TRIES = 3
119+
118120

119121
class CeleryConfig(object):
120122
BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_CELERY_DB}"

0 commit comments

Comments
 (0)