Skip to content

Commit ef2740a

Browse files
authored
feat: updates to fastpath query execution (#2268)
This PR updates query handling to allow base config properties like job timeout, reservation, and a preview max slots field to leverage the faster path (e.g. using jobs.query rather than jobs.insert).
1 parent 84fa75b commit ef2740a

File tree

5 files changed

+108
-0
lines changed

5 files changed

+108
-0
lines changed

google/cloud/bigquery/_job_helpers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,9 @@ def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
658658
"requestId",
659659
"createSession",
660660
"writeIncrementalResults",
661+
"jobTimeoutMs",
662+
"reservation",
663+
"maxSlots",
661664
}
662665

663666
unsupported_keys = request_keys - keys_allowlist

google/cloud/bigquery/job/base.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,37 @@ def job_timeout_ms(self, value):
224224
else:
225225
self._properties.pop("jobTimeoutMs", None)
226226

227+
@property
228+
def max_slots(self) -> Optional[int]:
229+
"""The maximum rate of slot consumption to allow for this job.
230+
231+
If set, the number of slots used to execute the job will be throttled
232+
to try and keep its slot consumption below the requested rate.
233+
This feature is not generally available.
234+
"""
235+
236+
max_slots = self._properties.get("maxSlots")
237+
if max_slots is not None:
238+
if isinstance(max_slots, str):
239+
return int(max_slots)
240+
if isinstance(max_slots, int):
241+
return max_slots
242+
return None
243+
244+
@max_slots.setter
245+
def max_slots(self, value):
246+
try:
247+
value = _int_or_none(value)
248+
except ValueError as err:
249+
raise ValueError("Pass an int for max slots, e.g. 100").with_traceback(
250+
err.__traceback__
251+
)
252+
253+
if value is not None:
254+
self._properties["maxSlots"] = str(value)
255+
else:
256+
self._properties.pop("maxSlots", None)
257+
227258
@property
228259
def reservation(self):
229260
"""str: Optional. The reservation that job would use.

tests/unit/job/test_base.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,3 +1276,44 @@ def test_reservation_setter(self):
12761276
job_config = self._make_one()
12771277
job_config.reservation = "foo"
12781278
self.assertEqual(job_config._properties["reservation"], "foo")
1279+
1280+
def test_max_slots_miss(self):
1281+
job_config = self._make_one()
1282+
self.assertEqual(job_config.max_slots, None)
1283+
1284+
def test_max_slots_set_and_clear(self):
1285+
job_config = self._make_one()
1286+
job_config.max_slots = 14
1287+
self.assertEqual(job_config.max_slots, 14)
1288+
job_config.max_slots = None
1289+
self.assertEqual(job_config.max_slots, None)
1290+
1291+
def test_max_slots_hit_str(self):
1292+
job_config = self._make_one()
1293+
job_config._properties["maxSlots"] = "4"
1294+
self.assertEqual(job_config.max_slots, 4)
1295+
1296+
def test_max_slots_hit_int(self):
1297+
job_config = self._make_one()
1298+
job_config._properties["maxSlots"] = int(3)
1299+
self.assertEqual(job_config.max_slots, 3)
1300+
1301+
def test_max_slots_hit_invalid(self):
1302+
job_config = self._make_one()
1303+
job_config._properties["maxSlots"] = object()
1304+
self.assertEqual(job_config.max_slots, None)
1305+
1306+
def test_max_slots_update_in_place(self):
1307+
job_config = self._make_one()
1308+
job_config.max_slots = 45 # update in place
1309+
self.assertEqual(job_config.max_slots, 45)
1310+
1311+
def test_max_slots_setter_invalid(self):
1312+
job_config = self._make_one()
1313+
with self.assertRaises(ValueError):
1314+
job_config.max_slots = "foo"
1315+
1316+
def test_max_slots_setter(self):
1317+
job_config = self._make_one()
1318+
job_config.max_slots = 123
1319+
self.assertEqual(job_config._properties["maxSlots"], "123")

tests/unit/job/test_query_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ def test_incremental_results(self):
172172
config.write_incremental_results = True
173173
self.assertEqual(config.write_incremental_results, True)
174174

175+
def test_max_slots(self):
176+
config = self._get_target_class()()
177+
config.max_slots = 99
178+
self.assertEqual(config.max_slots, 99)
179+
175180
def test_create_session(self):
176181
config = self._get_target_class()()
177182
self.assertIsNone(config.create_session)

tests/unit/test__job_helpers.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,19 @@ def make_query_response(
200200
make_query_request({"writeIncrementalResults": True}),
201201
id="job_config-with-incremental-results",
202202
),
203+
pytest.param(
204+
job_query.QueryJobConfig(
205+
reservation="foo",
206+
max_slots=100,
207+
),
208+
make_query_request(
209+
{
210+
"maxSlots": "100",
211+
"reservation": "foo",
212+
}
213+
),
214+
id="job_config-with-reservation-and-slots",
215+
),
203216
),
204217
)
205218
def test__to_query_request(job_config, expected):
@@ -1048,6 +1061,21 @@ def test_make_job_id_w_job_id_overrides_prefix():
10481061
True,
10491062
id="write_incremental_results",
10501063
),
1064+
pytest.param(
1065+
job_query.QueryJobConfig(job_timeout_ms=1000),
1066+
True,
1067+
id="job_timeout_ms",
1068+
),
1069+
pytest.param(
1070+
job_query.QueryJobConfig(reservation="foo"),
1071+
True,
1072+
id="reservation",
1073+
),
1074+
pytest.param(
1075+
job_query.QueryJobConfig(max_slots=20),
1076+
True,
1077+
id="max_slots",
1078+
),
10511079
),
10521080
)
10531081
def test_supported_by_jobs_query_from_queryjobconfig(

0 commit comments

Comments
 (0)