Skip to content

Commit c0bde2b

Browse files
authored
Merge pull request #12782 from [BEAM-10950] Overriding Dataflow Native BQSource.
* Overriding Dataflow Native BQSource. * Fixing tests * Tryin to fix to_runner_proto transform * fixup * fixup * fixup * adding comment
1 parent 34a2caf commit c0bde2b

File tree

8 files changed

+137
-64
lines changed

8 files changed

+137
-64
lines changed

sdks/python/apache_beam/examples/snippets/snippets_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ def test_model_bigqueryio(self):
682682
snippets.model_bigqueryio(p, project, dataset, table)
683683
else:
684684
p = TestPipeline()
685+
p.options.view_as(GoogleCloudOptions).temp_location = 'gs://mylocation'
685686
snippets.model_bigqueryio(p)
686687

687688
def _run_test_pipeline_for_options(self, fn):

sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ def test_big_query_new_types_native(self):
302302
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
303303
'use_standard_sql': False,
304304
'native': True,
305+
'use_json_exports': True,
305306
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS,
306307
'on_success_matcher': all_of(*pipeline_verifiers),
307308
'experiments': 'use_legacy_bq_sink',

sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,16 @@ def run_bq_pipeline(argv=None):
9191
use_standard_sql=known_args.use_standard_sql,
9292
use_json_exports=known_args.use_json_exports,
9393
kms_key=kms_key)
94-
if known_args.native:
95-
_ = data | 'write' >> beam.io.Write(
96-
beam.io.BigQuerySink(
97-
known_args.output,
98-
schema=table_schema,
99-
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
100-
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
101-
kms_key=kms_key))
102-
else:
103-
temp_file_format = (
104-
'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
105-
_ = data | 'write' >> beam.io.WriteToBigQuery(
106-
known_args.output,
107-
schema=table_schema,
108-
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
109-
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
110-
temp_file_format=temp_file_format,
111-
kms_key=kms_key)
94+
95+
temp_file_format = (
96+
'NEWLINE_DELIMITED_JSON' if known_args.use_json_exports else 'AVRO')
97+
_ = data | 'write' >> beam.io.WriteToBigQuery(
98+
known_args.output,
99+
schema=table_schema,
100+
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
101+
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
102+
temp_file_format=temp_file_format,
103+
kms_key=kms_key)
112104

113105
result = p.run()
114106
result.wait_until_finish()

sdks/python/apache_beam/io/gcp/bigquery.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,45 @@ def validate_write(disposition):
426426
# BigQuerySource, BigQuerySink.
427427

428428

429-
class BigQuerySource(dataflow_io.NativeSource):
429+
@deprecated(since='2.25.0', current="ReadFromBigQuery")
430+
def BigQuerySource(
431+
table=None,
432+
dataset=None,
433+
project=None,
434+
query=None,
435+
validate=False,
436+
coder=None,
437+
use_standard_sql=False,
438+
flatten_results=True,
439+
kms_key=None,
440+
use_dataflow_native_source=False):
441+
if use_dataflow_native_source:
442+
return _BigQuerySource(
443+
table,
444+
dataset,
445+
project,
446+
query,
447+
validate,
448+
coder,
449+
use_standard_sql,
450+
flatten_results,
451+
kms_key)
452+
else:
453+
return ReadFromBigQuery(
454+
table=table,
455+
dataset=dataset,
456+
project=project,
457+
query=query,
458+
validate=validate,
459+
coder=coder,
460+
use_standard_sql=use_standard_sql,
461+
flatten_results=flatten_results,
462+
use_json_exports=True,
463+
kms_key=kms_key)
464+
465+
466+
@deprecated(since='2.25.0', current="ReadFromBigQuery")
467+
class _BigQuerySource(dataflow_io.NativeSource):
430468
"""A source based on a BigQuery table."""
431469
def __init__(
432470
self,

sdks/python/apache_beam/io/gcp/bigquery_test.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ def test_invalid_json_neg_inf(self):
168168
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
169169
class TestBigQuerySource(unittest.TestCase):
170170
def test_display_data_item_on_validate_true(self):
171-
source = beam.io.BigQuerySource('dataset.table', validate=True)
171+
source = beam.io.BigQuerySource(
172+
'dataset.table', validate=True, use_dataflow_native_source=True)
172173

173174
dd = DisplayData.create_from(source)
174175
expected_items = [
@@ -178,23 +179,26 @@ def test_display_data_item_on_validate_true(self):
178179
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
179180

180181
def test_table_reference_display_data(self):
181-
source = beam.io.BigQuerySource('dataset.table')
182+
source = beam.io.BigQuerySource(
183+
'dataset.table', use_dataflow_native_source=True)
182184
dd = DisplayData.create_from(source)
183185
expected_items = [
184186
DisplayDataItemMatcher('validation', False),
185187
DisplayDataItemMatcher('table', 'dataset.table')
186188
]
187189
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
188190

189-
source = beam.io.BigQuerySource('project:dataset.table')
191+
source = beam.io.BigQuerySource(
192+
'project:dataset.table', use_dataflow_native_source=True)
190193
dd = DisplayData.create_from(source)
191194
expected_items = [
192195
DisplayDataItemMatcher('validation', False),
193196
DisplayDataItemMatcher('table', 'project:dataset.table')
194197
]
195198
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
196199

197-
source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
200+
source = beam.io.BigQuerySource(
201+
'xyz.com:project:dataset.table', use_dataflow_native_source=True)
198202
dd = DisplayData.create_from(source)
199203
expected_items = [
200204
DisplayDataItemMatcher('validation', False),
@@ -203,27 +207,32 @@ def test_table_reference_display_data(self):
203207
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
204208

205209
def test_parse_table_reference(self):
206-
source = beam.io.BigQuerySource('dataset.table')
210+
source = beam.io.BigQuerySource(
211+
'dataset.table', use_dataflow_native_source=True)
207212
self.assertEqual(source.table_reference.datasetId, 'dataset')
208213
self.assertEqual(source.table_reference.tableId, 'table')
209214

210-
source = beam.io.BigQuerySource('project:dataset.table')
215+
source = beam.io.BigQuerySource(
216+
'project:dataset.table', use_dataflow_native_source=True)
211217
self.assertEqual(source.table_reference.projectId, 'project')
212218
self.assertEqual(source.table_reference.datasetId, 'dataset')
213219
self.assertEqual(source.table_reference.tableId, 'table')
214220

215-
source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
221+
source = beam.io.BigQuerySource(
222+
'xyz.com:project:dataset.table', use_dataflow_native_source=True)
216223
self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
217224
self.assertEqual(source.table_reference.datasetId, 'dataset')
218225
self.assertEqual(source.table_reference.tableId, 'table')
219226

220-
source = beam.io.BigQuerySource(query='my_query')
227+
source = beam.io.BigQuerySource(
228+
query='my_query', use_dataflow_native_source=True)
221229
self.assertEqual(source.query, 'my_query')
222230
self.assertIsNone(source.table_reference)
223231
self.assertTrue(source.use_legacy_sql)
224232

225233
def test_query_only_display_data(self):
226-
source = beam.io.BigQuerySource(query='my_query')
234+
source = beam.io.BigQuerySource(
235+
query='my_query', use_dataflow_native_source=True)
227236
dd = DisplayData.create_from(source)
228237
expected_items = [
229238
DisplayDataItemMatcher('validation', False),
@@ -232,25 +241,36 @@ def test_query_only_display_data(self):
232241
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
233242

234243
def test_specify_query_sql_format(self):
235-
source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True)
244+
source = beam.io.BigQuerySource(
245+
query='my_query',
246+
use_standard_sql=True,
247+
use_dataflow_native_source=True)
236248
self.assertEqual(source.query, 'my_query')
237249
self.assertFalse(source.use_legacy_sql)
238250

239251
def test_specify_query_flattened_records(self):
240-
source = beam.io.BigQuerySource(query='my_query', flatten_results=False)
252+
source = beam.io.BigQuerySource(
253+
query='my_query',
254+
flatten_results=False,
255+
use_dataflow_native_source=True)
241256
self.assertFalse(source.flatten_results)
242257

243258
def test_specify_query_unflattened_records(self):
244-
source = beam.io.BigQuerySource(query='my_query', flatten_results=True)
259+
source = beam.io.BigQuerySource(
260+
query='my_query', flatten_results=True, use_dataflow_native_source=True)
245261
self.assertTrue(source.flatten_results)
246262

247263
def test_specify_query_without_table(self):
248-
source = beam.io.BigQuerySource(query='my_query')
264+
source = beam.io.BigQuerySource(
265+
query='my_query', use_dataflow_native_source=True)
249266
self.assertEqual(source.query, 'my_query')
250267
self.assertIsNone(source.table_reference)
251268

252269
def test_date_partitioned_table_name(self):
253-
source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
270+
source = beam.io.BigQuerySource(
271+
'dataset.table$20030102',
272+
validate=True,
273+
use_dataflow_native_source=True)
254274
dd = DisplayData.create_from(source)
255275
expected_items = [
256276
DisplayDataItemMatcher('validation', True),

sdks/python/apache_beam/io/gcp/bigquery_tools_test.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,9 @@ def test_read_from_table(self):
471471
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
472472
jobComplete=True, rows=table_rows, schema=schema)
473473
actual_rows = []
474-
with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
474+
with beam.io.BigQuerySource(
475+
'dataset.table',
476+
use_dataflow_native_source=True).reader(client) as reader:
475477
for row in reader:
476478
actual_rows.append(row)
477479
self.assertEqual(actual_rows, expected_rows)
@@ -485,7 +487,9 @@ def test_read_from_query(self):
485487
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
486488
jobComplete=True, rows=table_rows, schema=schema)
487489
actual_rows = []
488-
with beam.io.BigQuerySource(query='query').reader(client) as reader:
490+
with beam.io.BigQuerySource(
491+
query='query',
492+
use_dataflow_native_source=True).reader(client) as reader:
489493
for row in reader:
490494
actual_rows.append(row)
491495
self.assertEqual(actual_rows, expected_rows)
@@ -501,8 +505,9 @@ def test_read_from_query_sql_format(self):
501505
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
502506
jobComplete=True, rows=table_rows, schema=schema)
503507
actual_rows = []
504-
with beam.io.BigQuerySource(query='query',
505-
use_standard_sql=True).reader(client) as reader:
508+
with beam.io.BigQuerySource(
509+
query='query', use_standard_sql=True,
510+
use_dataflow_native_source=True).reader(client) as reader:
506511
for row in reader:
507512
actual_rows.append(row)
508513
self.assertEqual(actual_rows, expected_rows)
@@ -518,8 +523,9 @@ def test_read_from_query_unflatten_records(self):
518523
client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse(
519524
jobComplete=True, rows=table_rows, schema=schema)
520525
actual_rows = []
521-
with beam.io.BigQuerySource(query='query',
522-
flatten_results=False).reader(client) as reader:
526+
with beam.io.BigQuerySource(
527+
query='query', flatten_results=False,
528+
use_dataflow_native_source=True).reader(client) as reader:
523529
for row in reader:
524530
actual_rows.append(row)
525531
self.assertEqual(actual_rows, expected_rows)
@@ -532,12 +538,13 @@ def test_using_both_query_and_table_fails(self):
532538
ValueError,
533539
r'Both a BigQuery table and a query were specified\. Please specify '
534540
r'only one of these'):
535-
beam.io.BigQuerySource(table='dataset.table', query='query')
541+
beam.io.BigQuerySource(
542+
table='dataset.table', query='query', use_dataflow_native_source=True)
536543

537544
def test_using_neither_query_nor_table_fails(self):
538545
with self.assertRaisesRegex(
539546
ValueError, r'A BigQuery table or a query must be specified'):
540-
beam.io.BigQuerySource()
547+
beam.io.BigQuerySource(use_dataflow_native_source=True)
541548

542549
def test_read_from_table_as_tablerows(self):
543550
client = mock.Mock()
@@ -550,7 +557,9 @@ def test_read_from_table_as_tablerows(self):
550557
# We set the coder to TableRowJsonCoder, which is a signal that
551558
# the caller wants to see the rows as TableRows.
552559
with beam.io.BigQuerySource(
553-
'dataset.table', coder=TableRowJsonCoder).reader(client) as reader:
560+
'dataset.table',
561+
coder=TableRowJsonCoder,
562+
use_dataflow_native_source=True).reader(client) as reader:
554563
for row in reader:
555564
actual_rows.append(row)
556565
self.assertEqual(actual_rows, table_rows)
@@ -570,7 +579,9 @@ def test_read_from_table_and_job_complete_retry(self, patched_time_sleep):
570579
jobComplete=True, rows=table_rows, schema=schema)
571580
]
572581
actual_rows = []
573-
with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
582+
with beam.io.BigQuerySource(
583+
'dataset.table',
584+
use_dataflow_native_source=True).reader(client) as reader:
574585
for row in reader:
575586
actual_rows.append(row)
576587
self.assertEqual(actual_rows, expected_rows)
@@ -590,7 +601,9 @@ def test_read_from_table_and_multiple_pages(self):
590601
jobComplete=True, rows=table_rows, schema=schema)
591602
]
592603
actual_rows = []
593-
with beam.io.BigQuerySource('dataset.table').reader(client) as reader:
604+
with beam.io.BigQuerySource(
605+
'dataset.table',
606+
use_dataflow_native_source=True).reader(client) as reader:
594607
for row in reader:
595608
actual_rows.append(row)
596609
# We return expected rows for each of the two pages of results so we
@@ -599,7 +612,8 @@ def test_read_from_table_and_multiple_pages(self):
599612

600613
def test_table_schema_without_project(self):
601614
# Reader should pick executing project by default.
602-
source = beam.io.BigQuerySource(table='mydataset.mytable')
615+
source = beam.io.BigQuerySource(
616+
table='mydataset.mytable', use_dataflow_native_source=True)
603617
options = PipelineOptions(flags=['--project', 'myproject'])
604618
source.pipeline_options = options
605619
reader = source.reader()

sdks/python/apache_beam/io/iobase.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,11 @@ def get_desired_chunk_size(total_size):
890890
def expand(self, pbegin):
891891
if isinstance(self.source, BoundedSource):
892892
return pbegin | _SDFBoundedSourceWrapper(self.source)
893+
elif isinstance(self.source, ptransform.PTransform):
894+
# The Read transform can also admit a full PTransform as an input
895+
# rather than an anctual source. If the input is a PTransform, then
896+
# just apply it directly.
897+
return pbegin.pipeline | self.source
893898
else:
894899
# Treat Read itself as a primitive.
895900
return pvalue.PCollection(
@@ -917,13 +922,17 @@ def display_data(self):
917922

918923
def to_runner_api_parameter(self, context):
919924
# type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ReadPayload]
920-
return (
921-
common_urns.deprecated_primitives.READ.urn,
922-
beam_runner_api_pb2.ReadPayload(
923-
source=self.source.to_runner_api(context),
924-
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
925-
if self.source.is_bounded() else
926-
beam_runner_api_pb2.IsBounded.UNBOUNDED))
925+
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
926+
if isinstance(self.source, (BoundedSource, dataflow_io.NativeSource)):
927+
return (
928+
common_urns.deprecated_primitives.READ.urn,
929+
beam_runner_api_pb2.ReadPayload(
930+
source=self.source.to_runner_api(context),
931+
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
932+
if self.source.is_bounded() else
933+
beam_runner_api_pb2.IsBounded.UNBOUNDED))
934+
elif isinstance(self.source, ptransform.PTransform):
935+
return self.source.to_runner_api_parameter(context)
927936

928937
@staticmethod
929938
def from_runner_api_parameter(unused_ptransform, parameter, context):

0 commit comments

Comments
 (0)