Skip to content

Commit c2c9275

Browse files
authored
[Python] Added the validate support for ReadAllFromBigQuery (#34910)
* added validate * updated the doc strings
1 parent 1f31eb0 commit c2c9275

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

sdks/python/apache_beam/io/gcp/bigquery.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,7 +2744,8 @@ class ReadFromBigQuery(PTransform):
27442744
:data:`True` for most scenarios in order to catch errors as early as
27452745
possible (pipeline construction instead of pipeline execution). It
27462746
should be :data:`False` if the table is created during pipeline
2747-
execution by a previous step.
2747+
execution by a previous step. Set this to :data:`False`
2748+
if the BigQuery export method is slow due to checking file existence.
27482749
coder (~apache_beam.coders.coders.Coder): The coder for the table
27492750
rows. If :data:`None`, then the default coder is
27502751
_JsonToDictCoder, which will interpret every row as a JSON
@@ -3033,7 +3034,8 @@ class ReadAllFromBigQuery(PTransform):
30333034
bucket where the extracted table should be written as a string. If
30343035
:data:`None`, then the temp_location parameter is used.
30353036
validate (bool): If :data:`True`, various checks will be done when source
3036-
gets initialized (e.g., is table present?).
3037+
gets initialized (e.g., is table present?). Set this to :data:`False`
3038+
if the BigQuery export method is slow due to checking file existence.
30373039
kms_key (str): Experimental. Optional Cloud KMS key name for use when
30383040
creating new temporary tables.
30393041
"""
@@ -3078,6 +3080,7 @@ def expand(self, pcoll):
30783080
_BigQueryReadSplit(
30793081
options=pcoll.pipeline.options,
30803082
gcs_location=self.gcs_location,
3083+
validate=self.validate,
30813084
bigquery_job_labels=self.bigquery_job_labels,
30823085
job_name=job_name,
30833086
step_name=step_name,

sdks/python/apache_beam/io/gcp/bigquery_read_internal.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ def __init__(
195195
self,
196196
options: PipelineOptions,
197197
gcs_location: Union[str, ValueProvider] = None,
198+
validate: bool = False,
198199
use_json_exports: bool = False,
199200
bigquery_job_labels: Dict[str, str] = None,
200201
step_name: str = None,
@@ -205,6 +206,7 @@ def __init__(
205206
temp_dataset: Union[str, DatasetReference] = None,
206207
query_priority: Optional[str] = None):
207208
self.options = options
209+
self.validate = validate
208210
self.use_json_exports = use_json_exports
209211
self.gcs_location = gcs_location
210212
self.bigquery_job_labels = bigquery_job_labels or {}
@@ -285,14 +287,15 @@ def _get_bq_metadata(self):
285287

286288
def _create_source(self, path, schema):
287289
if not self.use_json_exports:
288-
return _create_avro_source(path)
290+
return _create_avro_source(path, validate=self.validate)
289291
else:
290292
return _TextSource(
291293
path,
292294
min_bundle_size=0,
293295
compression_type=CompressionTypes.UNCOMPRESSED,
294296
strip_trailing_newlines=True,
295-
coder=_JsonToDictCoder(schema))
297+
coder=_JsonToDictCoder(schema),
298+
validate=self.validate)
296299

297300
def _setup_temporary_dataset(
298301
self,

0 commit comments

Comments
 (0)