Fix temp table cleanup for continuous read in BigQueryIO#37535
Fix temp table cleanup for continuous read in BigQueryIO#37535stankiewicz wants to merge 2 commits intoapache:masterfrom
Conversation
Summary of ChangesHello @stankiewicz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue in BigQueryIO where temporary tables created for continuous dynamic reads in unbounded streaming pipelines were not being reliably cleaned up, leading to resource leakage. The solution implements a robust, state-based mechanism to accurately track the completion of parallel BigQuery Storage API streams. By introducing new DoFns and state management, the system now ensures that temporary resources are only deleted when they are truly no longer needed, thereby preventing orphaned BigQuery storage resources and improving pipeline efficiency. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
[BigQuery] Fix temporary table leakage during continuous dynamic reads
🚨 Problem
When using BigQueryIO for continuous dynamic reads in unbounded streaming pipelines, temporary tables created to hold query results were not being reliably cleaned up. The original implementation either deleted the temporary table too early (before parallel workers fully consumed the streams) or lacked a mechanism to track stream completions across parallel workers altogether. As a result, unbounded streaming jobs over time would leak BigQuery storage resources, leaving orphaned temporary datasets and tables.
Global Window-based cleanup is insufficient for this unbounded streaming scenario, requiring a more granular stream-tracking mechanism.
🛠️ Solution
This PR implements a robust, state-based cleanup mechanism that accurately tracks when parallel workers finish reading from BigQuery Storage API streams, deleting the temporary datasets and tables only when they are truly no longer needed.
Key Changes:
ValueStateto track the total number of streams created for a given query job, alongside a counter for how many streams have successfully completed.CleanupOperationMessage.initialize()containing metadata (CleanupInfo withprojectId,datasetId,tableId, andtotalStreams).CleanupOperationMessage.streamComplete()signal to the cleanup DoFn..withFromQuery()context toggle toBigQueryStorageStreamSourceso consumers know if the stream is bound to a temporary query table that requires tracking.Stringfields to preventNotSerializableExceptions during state persistence and shuffling.🧪 Testing
CleanupTempTableDoFnTest: Built a dedicated unit test utilizingFakeDatasetServiceandFakeBigQueryServices. It validates that:deleteTable/deleteDatasetmethods are invoked exactly once after the final stream completes.spotlessApplyandcompileJavapass locally without format or syntax violations.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.