Skip to content

Commit 25bde9f

Browse files
feat: retry AI/ML jobs that fail more often (#1965)
* feat: retry AI/ML jobs that fail more often * Update .pre-commit-config.yaml * Update pyproject.toml * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix mypy --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 827f5d5 commit 25bde9f

File tree

3 files changed

+265
-2
lines changed

3 files changed

+265
-2
lines changed

bigframes/session/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import weakref
4141

4242
import bigframes_vendored.constants as constants
43+
import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry
4344
import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery # noqa
4445
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
4546
import bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet
@@ -2051,6 +2052,7 @@ def _start_query_ml_ddl(
20512052
project=None,
20522053
timeout=None,
20532054
query_with_job=True,
2055+
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
20542056
)
20552057
return iterator, query_job
20562058

bigframes/session/_io/bigquery/__init__.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import typing
2525
from typing import Dict, Iterable, Literal, Mapping, Optional, overload, Tuple, Union
2626

27+
import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry
2728
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
2829
import google.api_core.exceptions
30+
import google.api_core.retry
2931
import google.cloud.bigquery as bigquery
3032

3133
from bigframes.core import log_adapter
@@ -245,7 +247,7 @@ def start_query_with_client(
245247
location: Optional[str],
246248
project: Optional[str],
247249
timeout: Optional[float],
248-
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
250+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
249251
query_with_job: Literal[True],
250252
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
251253
...
@@ -260,8 +262,40 @@ def start_query_with_client(
260262
location: Optional[str],
261263
project: Optional[str],
262264
timeout: Optional[float],
263-
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
265+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
266+
query_with_job: Literal[False],
267+
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
268+
...
269+
270+
271+
@overload
272+
def start_query_with_client(
273+
bq_client: bigquery.Client,
274+
sql: str,
275+
*,
276+
job_config: bigquery.QueryJobConfig,
277+
location: Optional[str],
278+
project: Optional[str],
279+
timeout: Optional[float],
280+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
281+
query_with_job: Literal[True],
282+
job_retry: google.api_core.retry.Retry,
283+
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
284+
...
285+
286+
287+
@overload
288+
def start_query_with_client(
289+
bq_client: bigquery.Client,
290+
sql: str,
291+
*,
292+
job_config: bigquery.QueryJobConfig,
293+
location: Optional[str],
294+
project: Optional[str],
295+
timeout: Optional[float],
296+
metrics: Optional[bigframes.session.metrics.ExecutionMetrics],
264297
query_with_job: Literal[False],
298+
job_retry: google.api_core.retry.Retry,
265299
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
266300
...
267301

@@ -276,6 +310,11 @@ def start_query_with_client(
276310
timeout: Optional[float] = None,
277311
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
278312
query_with_job: bool = True,
313+
# TODO(tswast): We can stop providing our own default once we use a
314+
# google-cloud-bigquery version with
315+
# https://github.com/googleapis/python-bigquery/pull/2256 merged, likely
316+
# version 3.36.0 or later.
317+
job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY,
279318
) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]:
280319
"""
281320
Starts query job and waits for results.
@@ -292,6 +331,7 @@ def start_query_with_client(
292331
location=location,
293332
project=project,
294333
api_timeout=timeout,
334+
job_retry=job_retry,
295335
)
296336
if metrics is not None:
297337
metrics.count_job_stats(row_iterator=results_iterator)
@@ -303,6 +343,7 @@ def start_query_with_client(
303343
location=location,
304344
project=project,
305345
timeout=timeout,
346+
job_retry=job_retry,
306347
)
307348
except google.api_core.exceptions.Forbidden as ex:
308349
if "Drive credentials" in ex.message:
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
# Original: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/retry.py
2+
# Copyright 2018 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from google.api_core import exceptions, retry
17+
import google.api_core.future.polling
18+
from google.auth import exceptions as auth_exceptions # type: ignore
19+
import requests.exceptions
20+
21+
_RETRYABLE_REASONS = frozenset(
22+
["rateLimitExceeded", "backendError", "internalError", "badGateway"]
23+
)
24+
25+
_UNSTRUCTURED_RETRYABLE_TYPES = (
26+
ConnectionError,
27+
exceptions.TooManyRequests,
28+
exceptions.InternalServerError,
29+
exceptions.BadGateway,
30+
exceptions.ServiceUnavailable,
31+
requests.exceptions.ChunkedEncodingError,
32+
requests.exceptions.ConnectionError,
33+
requests.exceptions.Timeout,
34+
auth_exceptions.TransportError,
35+
)
36+
37+
_MINUTE_IN_SECONDS = 60.0
38+
_HOUR_IN_SECONDS = 60.0 * _MINUTE_IN_SECONDS
39+
_DEFAULT_RETRY_DEADLINE = 10.0 * _MINUTE_IN_SECONDS
40+
41+
# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry
42+
# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the
43+
# `jobs.getQueryResults` REST API translates a job failure into an HTTP error.
44+
#
45+
# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate
46+
# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to
47+
# the `jobs.getQueryResult` API.
48+
#
49+
# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of
50+
# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry
51+
# timeout is reached.
52+
#
53+
# Note: This multiple should actually be a multiple of
54+
# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first
55+
# call from `job_retry()` refreshes the job state without actually restarting
56+
# the query. The second `job_retry()` actually restarts the query. For a more
57+
# detailed explanation, see the comments where we set `restart_query_job = True`
58+
# in `QueryJob.result()`'s inner `is_job_done()` function.
59+
_DEFAULT_JOB_DEADLINE = 2.0 * (2.0 * _DEFAULT_RETRY_DEADLINE)
60+
61+
62+
def _should_retry(exc):
63+
"""Predicate for determining when to retry.
64+
65+
We retry if and only if the 'reason' is 'backendError'
66+
or 'rateLimitExceeded'.
67+
"""
68+
if not hasattr(exc, "errors") or len(exc.errors) == 0:
69+
# Check for unstructured error returns, e.g. from GFE
70+
return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)
71+
72+
reason = exc.errors[0]["reason"]
73+
return reason in _RETRYABLE_REASONS
74+
75+
76+
DEFAULT_RETRY = retry.Retry(predicate=_should_retry, deadline=_DEFAULT_RETRY_DEADLINE)
77+
"""The default retry object.
78+
79+
Any method with a ``retry`` parameter will be retried automatically,
80+
with reasonable defaults. To disable retry, pass ``retry=None``.
81+
To modify the default retry behavior, call a ``with_XXX`` method
82+
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
83+
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
84+
"""
85+
86+
87+
def _should_retry_get_job_conflict(exc):
88+
"""Predicate for determining when to retry a jobs.get call after a conflict error.
89+
90+
Sometimes we get a 404 after a Conflict. In this case, we
91+
have pretty high confidence that by retrying the 404, we'll
92+
(hopefully) eventually recover the job.
93+
https://github.com/googleapis/python-bigquery/issues/2134
94+
95+
Note: we may be able to extend this to user-specified predicates
96+
after https://github.com/googleapis/python-api-core/issues/796
97+
to tweak existing Retry object predicates.
98+
"""
99+
return isinstance(exc, exceptions.NotFound) or _should_retry(exc)
100+
101+
102+
# Pick a deadline smaller than our other deadlines since we want to timeout
103+
# before those expire.
104+
_DEFAULT_GET_JOB_CONFLICT_DEADLINE = _DEFAULT_RETRY_DEADLINE / 3.0
105+
_DEFAULT_GET_JOB_CONFLICT_RETRY = retry.Retry(
106+
predicate=_should_retry_get_job_conflict,
107+
deadline=_DEFAULT_GET_JOB_CONFLICT_DEADLINE,
108+
)
109+
"""Private, may be removed in future."""
110+
111+
112+
# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
113+
# briefly had a default timeout, but even setting it at more than twice the
114+
# theoretical server-side default timeout of 2 minutes was not enough for
115+
# complex queries. See:
116+
# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647
117+
DEFAULT_TIMEOUT = None
118+
"""The default API timeout.
119+
120+
This is the time to wait per request. To adjust the total wait time, set a
121+
deadline on the retry object.
122+
"""
123+
124+
job_retry_reasons = (
125+
"rateLimitExceeded",
126+
"backendError",
127+
"internalError",
128+
"jobBackendError",
129+
"jobInternalError",
130+
"jobRateLimitExceeded",
131+
)
132+
133+
134+
def _job_should_retry(exc):
135+
# Sometimes we have ambiguous errors, such as 'backendError' which could
136+
# be due to an API problem or a job problem. For these, make sure we retry
137+
# our is_job_done() function.
138+
#
139+
# Note: This won't restart the job unless we know for sure it's because of
140+
# the job status and set restart_query_job = True in that loop. This means
141+
# that we might end up calling this predicate twice for the same job
142+
# but from different paths: (1) from jobs.getQueryResults RetryError and
143+
# (2) from translating the job error from the body of a jobs.get response.
144+
#
145+
# Note: If we start retrying job types other than queries where we don't
146+
# call the problematic getQueryResults API to check the status, we need
147+
# to provide a different predicate, as there shouldn't be ambiguous
148+
# errors in those cases.
149+
if isinstance(exc, exceptions.RetryError):
150+
exc = exc.cause
151+
152+
# Per https://github.com/googleapis/python-bigquery/issues/1929, sometimes
153+
# retriable errors make their way here. Because of the separate
154+
# `restart_query_job` logic to make sure we aren't restarting non-failed
155+
# jobs, it should be safe to continue and not totally fail our attempt at
156+
# waiting for the query to complete.
157+
if _should_retry(exc):
158+
return True
159+
160+
if not hasattr(exc, "errors") or len(exc.errors) == 0:
161+
return False
162+
163+
reason = exc.errors[0]["reason"]
164+
return reason in job_retry_reasons
165+
166+
167+
DEFAULT_JOB_RETRY = retry.Retry(
168+
predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE
169+
)
170+
"""
171+
The default job retry object.
172+
"""
173+
174+
175+
DEFAULT_ML_JOB_RETRY = retry.Retry(
176+
predicate=_job_should_retry, deadline=_HOUR_IN_SECONDS
177+
)
178+
"""
179+
The default job retry object for AI/ML jobs.
180+
181+
Such jobs can take a long time to fail. See: b/436586523.
182+
"""
183+
184+
185+
def _query_job_insert_should_retry(exc):
186+
# Per https://github.com/googleapis/python-bigquery/issues/2134, sometimes
187+
# we get a 404 error. In this case, if we get this far, assume that the job
188+
# doesn't actually exist and try again. We can't add 404 to the default
189+
# job_retry because that happens for errors like "this table does not
190+
# exist", which probably won't resolve with a retry.
191+
if isinstance(exc, exceptions.RetryError):
192+
exc = exc.cause
193+
194+
if isinstance(exc, exceptions.NotFound):
195+
message = exc.message
196+
# Don't try to retry table/dataset not found, just job not found.
197+
# The URL contains jobs, so use whitespace to disambiguate.
198+
return message is not None and " job" in message.lower()
199+
200+
return _job_should_retry(exc)
201+
202+
203+
_DEFAULT_QUERY_JOB_INSERT_RETRY = retry.Retry(
204+
predicate=_query_job_insert_should_retry,
205+
# jobs.insert doesn't wait for the job to complete, so we don't need the
206+
# long _DEFAULT_JOB_DEADLINE for this part.
207+
deadline=_DEFAULT_RETRY_DEADLINE,
208+
)
209+
"""Private, may be removed in future."""
210+
211+
212+
DEFAULT_GET_JOB_TIMEOUT = 128
213+
"""
214+
Default timeout for Client.get_job().
215+
"""
216+
217+
POLLING_DEFAULT_VALUE = google.api_core.future.polling.PollingFuture._DEFAULT_VALUE
218+
"""
219+
Default value defined in google.api_core.future.polling.PollingFuture.
220+
"""

0 commit comments

Comments
 (0)