Skip to content

Commit 63a8be9

Browse files
Handle query returned with empty rows gracefully on bigquery enrichmement (#36791)
* Handle query returned with empty rows gracefully on bigquery enrichment handler * Ran isort linter * Manually update import order * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix typo * Updated string formatting and added a test case * Clean up logic in checking unmatched requests --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 6ba393a commit 63a8be9

File tree

2 files changed

+172
-7
lines changed

2 files changed

+172
-7
lines changed

sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17+
import logging
1718
from collections.abc import Callable
1819
from collections.abc import Mapping
1920
from typing import Any
@@ -30,6 +31,8 @@
3031
QueryFn = Callable[[beam.Row], str]
3132
ConditionValueFn = Callable[[beam.Row], list[Any]]
3233

34+
_LOGGER = logging.getLogger(__name__)
35+
3336

3437
def _validate_bigquery_metadata(
3538
table_name, row_restriction_template, fields, condition_value_fn, query_fn):
@@ -87,6 +90,7 @@ def __init__(
8790
query_fn: Optional[QueryFn] = None,
8891
min_batch_size: int = 1,
8992
max_batch_size: int = 10000,
93+
throw_exception_on_empty_results: bool = True,
9094
**kwargs,
9195
):
9296
"""
@@ -145,6 +149,7 @@ def __init__(
145149
self.query_template = (
146150
"SELECT %s FROM %s WHERE %s" %
147151
(self.select_fields, self.table_name, self.row_restriction_template))
152+
self.throw_exception_on_empty_results = throw_exception_on_empty_results
148153
self.kwargs = kwargs
149154
self._batching_kwargs = {}
150155
if not query_fn:
@@ -157,10 +162,13 @@ def __enter__(self):
157162
def _execute_query(self, query: str):
158163
try:
159164
results = self.client.query(query=query).result()
165+
row_list = [dict(row.items()) for row in results]
166+
if not row_list:
167+
return None
160168
if self._batching_kwargs:
161-
return [dict(row.items()) for row in results]
169+
return row_list
162170
else:
163-
return [dict(row.items()) for row in results][0]
171+
return row_list[0]
164172
except BadRequest as e:
165173
raise BadRequest(
166174
f'Could not execute the query: {query}. Please check if '
@@ -204,11 +212,21 @@ def __call__(self, request: Union[beam.Row, list[beam.Row]], *args, **kwargs):
204212
query = raw_query.format(*values)
205213

206214
responses_dict = self._execute_query(query)
207-
for response in responses_dict:
208-
response_row = beam.Row(**response)
209-
response_key = self.create_row_key(response_row)
210-
if response_key in requests_map:
211-
responses.append((requests_map[response_key], response_row))
215+
unmatched_requests = requests_map.copy()
216+
if responses_dict:
217+
for response in responses_dict:
218+
response_row = beam.Row(**response)
219+
response_key = self.create_row_key(response_row)
220+
if response_key in unmatched_requests:
221+
req = unmatched_requests.pop(response_key)
222+
responses.append((req, response_row))
223+
if unmatched_requests:
224+
if self.throw_exception_on_empty_results:
225+
raise ValueError(f"no matching row found for query: {query}")
226+
else:
227+
_LOGGER.warning('no matching row found for query: %s', query)
228+
for req in unmatched_requests.values():
229+
responses.append((req, beam.Row()))
212230
return responses
213231
else:
214232
request_dict = request._asdict()
@@ -223,6 +241,12 @@ def __call__(self, request: Union[beam.Row, list[beam.Row]], *args, **kwargs):
223241
# construct the query.
224242
query = self.query_template.format(*values)
225243
response_dict = self._execute_query(query)
244+
if response_dict is None:
245+
if self.throw_exception_on_empty_results:
246+
raise ValueError(f"no matching row found for query: {query}")
247+
else:
248+
_LOGGER.warning('no matching row found for query: %s', query)
249+
return request, beam.Row()
226250
return request, beam.Row(**response_dict)
227251

228252
def __exit__(self, exc_type, exc_val, exc_tb):

sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,147 @@ def test_bigquery_enrichment_with_redis(self):
355355
assert_that(pcoll_cached, equal_to(expected_rows))
356356
BigQueryEnrichmentHandler.__call__ = actual
357357

358+
def test_bigquery_enrichment_no_results_throws_exception(self):
359+
requests = [
360+
beam.Row(id=999, name='X'), # This ID does not exist
361+
]
362+
handler = BigQueryEnrichmentHandler(
363+
project=self.project,
364+
row_restriction_template="id = {}",
365+
table_name=self.table_name,
366+
fields=['id'],
367+
throw_exception_on_empty_results=True,
368+
)
369+
370+
with self.assertRaisesRegex(ValueError, "no matching row found for query"):
371+
with TestPipeline(is_integration_test=True) as test_pipeline:
372+
_ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
373+
374+
def test_bigquery_enrichment_no_results_graceful(self):
375+
requests = [
376+
beam.Row(id=999, name='X'), # This ID does not exist
377+
beam.Row(id=1000, name='Y'), # This ID does not exist
378+
]
379+
# When no results are found and not throwing, Enrichment yields original.
380+
expected_rows = requests
381+
382+
handler = BigQueryEnrichmentHandler(
383+
project=self.project,
384+
row_restriction_template="id = {}",
385+
table_name=self.table_name,
386+
fields=['id'],
387+
min_batch_size=1,
388+
max_batch_size=100,
389+
throw_exception_on_empty_results=False,
390+
)
391+
392+
with TestPipeline(is_integration_test=True) as test_pipeline:
393+
pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
394+
assert_that(pcoll, equal_to(expected_rows))
395+
396+
def test_bigquery_enrichment_no_results_partial_graceful_batched(self):
397+
requests = [
398+
beam.Row(id=1, name='A'), # This ID exists
399+
beam.Row(id=1000, name='Y'), # This ID does not exist
400+
]
401+
# When no results are found and not throwing, Enrichment yields original.
402+
expected_rows = [
403+
beam.Row(id=1, name='A', quantity=2, distribution_center_id=3),
404+
beam.Row(id=1000,
405+
name='Y'), # This ID does not exist so remains unchanged
406+
]
407+
408+
handler = BigQueryEnrichmentHandler(
409+
project=self.project,
410+
row_restriction_template="id = {}",
411+
table_name=self.table_name,
412+
fields=['id'],
413+
min_batch_size=2,
414+
max_batch_size=100,
415+
throw_exception_on_empty_results=False,
416+
)
417+
418+
with TestPipeline(is_integration_test=True) as test_pipeline:
419+
pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
420+
assert_that(pcoll, equal_to(expected_rows))
421+
422+
def test_bigquery_enrichment_no_results_graceful_batched(self):
423+
requests = [
424+
beam.Row(id=999, name='X'), # This ID does not exist
425+
beam.Row(id=1000, name='Y'), # This ID does not exist
426+
]
427+
# When no results are found and not throwing, Enrichment yields original.
428+
expected_rows = requests
429+
430+
handler = BigQueryEnrichmentHandler(
431+
project=self.project,
432+
row_restriction_template="id = {}",
433+
table_name=self.table_name,
434+
fields=['id'],
435+
min_batch_size=2,
436+
max_batch_size=100,
437+
throw_exception_on_empty_results=False,
438+
)
439+
440+
with TestPipeline(is_integration_test=True) as test_pipeline:
441+
pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
442+
assert_that(pcoll, equal_to(expected_rows))
443+
444+
def test_bigquery_enrichment_no_results_with_query_fn_throws_exception(self):
445+
requests = [
446+
beam.Row(id=999, name='X'), # This ID does not exist
447+
]
448+
# This query_fn will return no results
449+
fn = functools.partial(query_fn, self.table_name)
450+
handler = BigQueryEnrichmentHandler(
451+
project=self.project,
452+
query_fn=fn,
453+
throw_exception_on_empty_results=True,
454+
)
455+
456+
with self.assertRaisesRegex(ValueError, "no matching row found for query"):
457+
with TestPipeline(is_integration_test=True) as test_pipeline:
458+
_ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
459+
460+
def test_bigquery_enrichment_no_results_with_query_fn_graceful(self):
461+
requests = [
462+
beam.Row(id=999, name='X'), # This ID does not exist
463+
beam.Row(id=1000, name='Y'), # This ID does not exist
464+
]
465+
# When no results are found and not throwing, Enrichment yields original.
466+
expected_rows = requests
467+
468+
# This query_fn will return no results
469+
fn = functools.partial(query_fn, self.table_name)
470+
handler = BigQueryEnrichmentHandler(
471+
project=self.project,
472+
query_fn=fn,
473+
throw_exception_on_empty_results=False,
474+
)
475+
476+
with TestPipeline(is_integration_test=True) as test_pipeline:
477+
pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
478+
assert_that(pcoll, equal_to(expected_rows))
479+
480+
def test_bigquery_enrichment_partial_results_throws_exception_batched(self):
481+
requests = [
482+
beam.Row(id=1, name='A'), # This ID exists
483+
beam.Row(id=1000, name='Y'), # This ID does not exist
484+
]
485+
handler = BigQueryEnrichmentHandler(
486+
project=self.project,
487+
row_restriction_template="id = {}",
488+
table_name=self.table_name,
489+
fields=['id'],
490+
min_batch_size=2,
491+
max_batch_size=100,
492+
throw_exception_on_empty_results=True,
493+
)
494+
495+
with self.assertRaisesRegex(ValueError, "no matching row found for query"):
496+
with TestPipeline(is_integration_test=True) as test_pipeline:
497+
_ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
498+
358499

359500
if __name__ == '__main__':
360501
unittest.main()

0 commit comments

Comments
 (0)