Add BigQuery Storage Read API Enrichment Handler#35197
Add BigQuery Storage Read API Enrichment Handler#35197pandasanjay wants to merge 19 commits intoapache:masterfrom
Conversation
- Created `bigquery_storage_read_it_test.py` for integration tests covering various enrichment scenarios including single and batch enrichment, column aliasing, and error handling. - Added `bigquery_storage_read_test.py` for unit tests focusing on handler initialization, condition value extraction, filter building, and batch processing. - Implemented tests for edge cases such as no matches, invalid column references, and latest value selection.
|
Hi @sjvanrossum, 👋 Could you please take a look at this pull request when you have a moment? It introduces a new The PR includes the handler implementation and accompanying documentation. Please review the changes and let me know if anything needs an update or further clarification. Thanks, |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Please fix the failed workflows. |
|
Thanks for the PR, @pandasanjay 🙏 You might want to take a look at the CI test results. Also, this guide could help address some of the linting issues: Additionally, you can check out my PR for the CloudSQL Enrichment Handler (#34398). I believe some of the feedback there could be relevant and applicable here as well. |
|
Thank you @mohamedawnallah for providing additional details! 👍 I’m planning to make a few more documentation updates similar to yours. It looks like the pipeline failures aren’t related to my changes—I’m currently investigating the linting issues. |
Some CI tests can occasionally be flaky, but this doesn’t happen very often. To minimize issues, make sure you’re using the latest code from the Beam upstream |
- Implemented the BigQueryStorageEnrichmentHandler for enriching data using the BigQuery Storage Read API. - Added unit tests to validate the functionality of the enrichment handler, including various scenarios such as single and batch processing, condition value functions, and column aliasing. - Created a new documentation page for the BigQuery Storage Read API enrichment, detailing usage, examples, and configuration options. - Updated existing documentation to include links to the new enrichment handler and examples.
…ity and consistency - Adjusted indentation and formatting for better clarity. - Consolidated test setup and handler creation into dedicated methods. - Enhanced error handling tests for invalid arguments. - Verified condition value extraction and renaming functionality. - Ensured aliasing behavior is correctly tested for both single and batch processing. - Added assertions to confirm original column names are not present in aliased responses.
…ryStorageEnrichmentHandler
|
assign set of reviewers |
|
Assigning reviewers: R: @damccorm for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Looks like there is a need for a BQ table to make the examples pipeline to pass. -- BigQuery DDL for enrichment_with_bigquery_storage_basic If this can be configured in any config in code, please let me know. @damccorm Could you please help here. Thanks :) |
|
@pandasanjay – It looks like the example pipelines fail with this error: Given this to be installed have you tested this with BQ table in your Google Cloud and the tests pass? |
| try: | ||
| with concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=max_workers) as executor: | ||
| for stream in session.streams: | ||
| futures.append( | ||
| executor.submit(_read_single_stream_worker, stream.name)) | ||
| for future in concurrent.futures.as_completed(futures): | ||
| try: | ||
| all_bq_rows_original_keys.extend(future.result()) | ||
| except Exception as e: | ||
| logger.error("Error processing future result: %s", e, exc_info=True) | ||
| except Exception as pool_error: | ||
| logger.error("ThreadPool error: %s", pool_error, exc_info=True) | ||
| logger.debug("Fetched %s rows from BQ.", len(all_bq_rows_original_keys)) | ||
| return all_bq_rows_original_keys |
There was a problem hiding this comment.
Hi @tvalentyn
Sorry for the late reply.
We recently deployed this version of the code in production, but it didn’t work well due to conflicts between the thread pool used for processing connection streams and the Dataflow process.
If you have any suggestions or ideas on handling stream connections more effectively, they would be greatly appreciated. If there’s a better approach, we’d love to hear your thoughts.
To address the issues we encountered, we switched to a custom transform, which has been performing well. I will add that to this pull request soon.
Best regards.
There was a problem hiding this comment.
thanks, i'd defer to other reviewers first who looked more closely into your change and have context. happy to take another look if you don't get a response
There was a problem hiding this comment.
Hi @tvalentyn,
Apologies for the delayed response! I appreciate your suggestion.
Hi @robertwb and @lukecwik, I found your names in the git commit. You have worked on part of building the enrichment transform. I need your expertise to solve a problem. Could you please review this PR, especially the concurrent stream creation section, and suggest improvements? If you think anyone else knows better about this, please tag them. Thanks :)
There was a problem hiding this comment.
Hey @pandasanjay I can continue with this review. Could we please start with my comments here though - https://github.com/apache/beam/pull/35197/files#r2158863302 ? It has implications on the whole PR (I'm not sure whether we need the whole change, though you did call out some good pieces for us to improve)
|
waiting on author |
|
hey @pandasanjay, is this PR still relevant? |
Hi @ahmedabu98, Yes.. I explored two approaches internally:
I’m currently on annual leave and will push the changes when I return. If you have suggestions while I’m away, please share and I’ll incorporate them once I’m back. Thanks! |
|
Reminder, please take a look at this pr: @tvalentyn @kennknowles |
|
waiting on author |
…ture/enrichment_bigquery_storage_read
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a new BigQueryStorageEnrichmentHandler for Apache Beam that leverages the Google Cloud BigQuery Storage Read API for efficient data enrichment. The handler provides a high-performance alternative to traditional SQL-based BigQuery lookups within Beam pipelines, offering features like dynamic filtering, column aliasing, batching, and parallel stream reading.
Key changes:
- Adds new BigQuery Storage Read API enrichment handler with advanced features
- Includes comprehensive unit and integration tests for the new handler
- Updates documentation website with new enrichment example pages
- Adds code examples demonstrating basic and advanced usage patterns
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_storage_read.py |
Core implementation of BigQueryStorageEnrichmentHandler with batching, filtering, and parallel reading |
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_storage_read_test.py |
Unit tests for handler functionality including aliasing and batching |
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_storage_read_it_test.py |
Integration tests against live BigQuery instances |
sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py |
Code examples for basic and advanced BigQuery enrichment usage |
sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py |
Test updates for the new enrichment examples |
website/www/site/content/en/documentation/transforms/python/elementwise/enrichment-bigquery-storage.md |
Documentation page for the new BigQuery Storage enrichment handler |
website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md |
Updated main enrichment documentation to reference new handler |
website/www/site/layouts/partials/section-menu/en/documentation.html |
Navigation menu update to include new documentation page |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
website/www/site/content/en/documentation/transforms/python/elementwise/enrichment.md
Outdated
Show resolved
Hide resolved
| enrichment_with_bigtable, | ||
| enrichment_with_vertex_ai_legacy, | ||
| ) | ||
| from apache_beam.examples.snippets.transforms.elementwise.enrichment import ( | ||
| enrichment_with_vertex_ai, ) | ||
| from apache_beam.examples.snippets.transforms.elementwise.enrichment import ( | ||
| enrichment_with_bigquery_storage_basic, | ||
| enrichment_with_bigquery_storage_custom_function, | ||
| ) | ||
| except ImportError: | ||
| raise unittest.SkipTest("RequestResponseIO dependencies are not installed") |
There was a problem hiding this comment.
Import statements are incorrectly placed inside a try block. The imports on lines 37-46 should be moved after the existing imports that start at line 49, as they are currently inside a try block that will cause syntax issues.
| return expected | ||
|
|
||
|
|
||
| @mock.patch("sys.stdout", new_callable=StringIO) |
There was a problem hiding this comment.
Function definition is missing proper indentation and parameter. The function should be a method with self parameter and proper indentation within the test class, not a standalone function with a decorator.
| @mock.patch("sys.stdout", new_callable=StringIO) |
| from apitools.base.py.exceptions import HttpError | ||
| from google.api_core.exceptions import BadRequest, GoogleAPICallError | ||
|
|
||
| # Removed NotFound from import as it is unused |
There was a problem hiding this comment.
Comment indicates NotFound was removed due to being unused, but NotFound is actually imported and used on line 42 and line 451. This comment is misleading and should be removed.
| # Removed NotFound from import as it is unused |
| # --- Store new parameters --- | ||
| self._latest_value_callback = latest_value_selector | ||
| self.primary_keys = primary_keys | ||
| # --- End store --- | ||
|
|
There was a problem hiding this comment.
[nitpick] Comments like '--- Store new parameters ---' and '--- End store ---' are unnecessary and don't add value. These should be removed for cleaner code.
| # --- Store new parameters --- | |
| self._latest_value_callback = latest_value_selector | |
| self.primary_keys = primary_keys | |
| # --- End store --- | |
| self._latest_value_callback = latest_value_selector | |
| self.primary_keys = primary_keys |
| # TODO: Add proper caching functionality with TTL, cache size limits, | ||
| # and configurable cache policies to improve performance and reduce | ||
| # BigQuery API calls for repeated requests. |
There was a problem hiding this comment.
[nitpick] TODO comments should include issue numbers or assignees for tracking. Consider creating a GitHub issue for this caching enhancement and referencing it in the TODO.
| # TODO: Add proper caching functionality with TTL, cache size limits, | |
| # and configurable cache policies to improve performance and reduce | |
| # BigQuery API calls for repeated requests. | |
| # TODO(#1234): Add proper caching functionality with TTL, cache size limits, | |
| # and configurable cache policies to improve performance and reduce | |
| # BigQuery API calls for repeated requests. See: https://github.com/apache/beam/issues/1234 |
…ementwise/enrichment.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
waiting on author |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |

#35196
Description:
This pull request introduces a new
EnrichmentSourceHandlerfor Apache Beam,BigQueryStorageEnrichmentHandler, designed to leverage the Google Cloud BigQuery Storage Read API for efficient data enrichment. This handler provides a high-performance alternative to traditional SQL-based BigQuery lookups within Beam pipelines.Motivation and Context:
Enriching data by joining PCollection elements with data stored in BigQuery is a common use case. While existing methods often rely on executing SQL queries, the BigQuery Storage Read API offers a more direct and typically faster way to retrieve data, especially for large volumes or when fine-grained row-level access is needed. This handler aims to:
Key Features and Improvements:
The
BigQueryStorageEnrichmentHandleroffers several enhancements:row_restriction_template.row_restriction_template_fn.fields: Specifies inputbeam.Rowfields for generating join keys and for use in filter templates.additional_condition_fields: Allows using input fields for filtering without including them in the join key.condition_value_fn: Provides complete control over generating the dictionary of values used for both filtering and join key creation.original_col as alias_colincolumn_names) to prevent naming conflicts in the enrichedbeam.Row.CreateReadSessioncalls, reducing API overhead. Batch size and duration are configurable (min_batch_size,max_batch_size,max_batch_duration_secs).ThreadPoolExecutorto read data from multiple streams of a BigQuery Read Session in parallel, potentially improving data fetching throughput. Concurrency is configurable viamax_parallel_streams.latest_value_selectorcallback that allows users to define custom logic for selecting the desired row when multiple BigQuery rows match a single input request (e.g., picking the record with the most recent timestamp).primary_keyscan be used by this selector.BigQueryReadClient.Advantages over Traditional SQL-based BigQuery Enrichment:
Documentation:
Comprehensive documentation for this handler, including usage examples, parameter descriptions, features, and limitations, has been added in
docs/bigquery_storage_enrichment_handler.md.Implementation Details:
The handler (
sdk/ptyhon/transforms/enrichment_handlers/bigquery_storage_read.py) managesBigQueryReadClientinstances, constructsReadSessionrequests with appropriate row restrictions and selected fields, and processes the resulting Arrow record batches. It integrates with Beam'sEnrichmenttransform, providing batching and caching key generation.Testing Considerations:
This handler provides a powerful and efficient way to enrich data in Apache Beam pipelines using BigQuery.
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.