Skip to content

Commit e0b9ec7

Browse files
committed
udpates retry handling and testing of retry handling
1 parent 028f741 commit e0b9ec7

File tree

2 files changed

+104
-8
lines changed

2 files changed

+104
-8
lines changed

google/cloud/bigquery/job/base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,8 +1044,7 @@ def result( # type: ignore # (incompatible with supertype)
10441044
if self.state is None:
10451045
self._begin(retry=retry, timeout=timeout)
10461046

1047-
kwargs = {"retry": retry}
1048-
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
1047+
return super(_AsyncJob, self).result(timeout=timeout, retry=retry)
10491048

10501049
def cancelled(self):
10511050
"""Check if the job has been cancelled.

tests/unit/test_job_retry.py

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -618,25 +618,26 @@ def test_query_and_wait_retries_job_for_DDL_queries(global_time_lock):
618618

619619

620620
@pytest.mark.parametrize(
621-
"result_retry",
621+
"result_retry_param",
622622
[
623623
pytest.param(
624624
{},
625-
id="default retry use case",
625+
id="default retry {}",
626626
),
627627
pytest.param(
628628
{
629629
"retry": google.cloud.bigquery.retry.DEFAULT_RETRY.with_timeout(
630630
timeout=10.0
631631
)
632632
},
633-
id="custom retry object use case",
633+
id="custom retry object with timeout 10.0",
634634
),
635635
],
636636
)
637-
def test_retry_load_job_result(result_retry, PROJECT, DS_ID):
637+
def test_retry_load_job_result(result_retry_param, PROJECT, DS_ID):
638638
from google.cloud.bigquery.dataset import DatasetReference
639639
from google.cloud.bigquery.job.load import LoadJob
640+
import google.cloud.bigquery.retry
640641

641642
client = make_client()
642643
conn = client._connection = make_connection(
@@ -654,10 +655,106 @@ def test_retry_load_job_result(result_retry, PROJECT, DS_ID):
654655

655656
table_ref = DatasetReference(project=PROJECT, dataset_id=DS_ID).table("new_table")
656657
job = LoadJob("id_1", source_uris=None, destination=table_ref, client=client)
657-
result = job.result(**result_retry)
658+
659+
with mock.patch.object(
660+
client, "_call_api", wraps=client._call_api
661+
) as wrapped_call_api:
662+
result = job.result(**result_retry_param)
658663

659664
assert job.state == "DONE"
660665
assert result.output_rows == 1
661666

662-
# We made all the calls we expected to.
667+
# Check that _call_api was called multiple times due to retry
668+
assert wrapped_call_api.call_count > 1
669+
670+
# Verify the retry object used in the calls to _call_api
671+
expected_retry = result_retry_param.get(
672+
"retry", google.cloud.bigquery.retry.DEFAULT_RETRY
673+
)
674+
675+
for call in wrapped_call_api.mock_calls:
676+
name, args, kwargs = call
677+
# The retry object is the first positional argument to _call_api
678+
called_retry = args[0]
679+
680+
# We only care about the calls made during the job.result() polling
681+
if kwargs.get("method") == "GET" and "jobs/id_1" in kwargs.get("path", ""):
682+
assert called_retry._predicate == expected_retry._predicate
683+
assert called_retry._initial == expected_retry._initial
684+
assert called_retry._maximum == expected_retry._maximum
685+
assert called_retry._multiplier == expected_retry._multiplier
686+
assert called_retry._deadline == expected_retry._deadline
687+
if "retry" in result_retry_param:
688+
# Specifically check the timeout for the custom retry case
689+
assert called_retry._timeout == 10.0
690+
print(10.0)
691+
else:
692+
assert called_retry._timeout == expected_retry._timeout
693+
print("not 10.0")
694+
695+
# The number of api_request calls should still be 3
663696
assert conn.api_request.call_count == 3
697+
698+
699+
# @pytest.mark.parametrize(
700+
# "result_retry",
701+
# [
702+
# pytest.param(
703+
# {},
704+
# id="default retry use case",
705+
# ),
706+
# pytest.param(
707+
# {
708+
# "retry": google.cloud.bigquery.retry.DEFAULT_RETRY.with_timeout(
709+
# timeout=10.0
710+
# )
711+
# },
712+
# id="custom retry object use case",
713+
# ),
714+
# ],
715+
# )
716+
# def test_retry_load_job_result(result_retry, PROJECT, DS_ID):
717+
718+
# from google.cloud.bigquery.dataset import DatasetReference
719+
# from google.cloud.bigquery.job.load import LoadJob
720+
# import google.cloud.bigquery.retry
721+
722+
# client = make_client()
723+
# conn = client._connection = make_connection(
724+
# dict(
725+
# status=dict(state="RUNNING"),
726+
# jobReference={"jobId": "id_1"},
727+
# ),
728+
# google.api_core.exceptions.ServiceUnavailable("retry me"),
729+
# dict(
730+
# status=dict(state="DONE"),
731+
# jobReference={"jobId": "id_1"},
732+
# statistics={"load": {"outputRows": 1}},
733+
# ),
734+
# )
735+
736+
# table_ref = DatasetReference(project=PROJECT, dataset_id=DS_ID).table("new_table")
737+
# job = LoadJob("id_1", source_uris=None, destination=table_ref, client=client)
738+
# result = job.result(**result_retry)
739+
740+
# print(f"Test Case ID: {result_retry}")
741+
# for call in conn.api_request.mock_calls:
742+
# name, args, kwargs = call
743+
# print(f" Call to api_request with kwargs: {kwargs}")
744+
745+
# if result_retry:
746+
# custom_retry_used = False
747+
# for name, args, kwargs in conn.api_request.mock_calls:
748+
# if 'retry' in kwargs:
749+
# # Check if the retry object passed has the custom timeout
750+
# if kwargs['retry']._timeout == 10.0:
751+
# custom_retry_used = True
752+
# assert kwargs['retry']._deadline == 10.0 # Deadline should also be updated
753+
# # Optional: check other properties if needed
754+
# assert custom_retry_used, "Custom retry object with timeout 10.0 was not used"
755+
756+
# assert job.state == "DONE"
757+
# assert result.output_rows == 1
758+
759+
# # We made all the calls we expected to.
760+
# assert conn.api_request.call_count == 3

0 commit comments

Comments
 (0)