Skip to content

Commit 30fd958

Browse files
authored
feat(bigquery): add GEOGRAPHY type support for BigQuery I/O (#36121)
* feat(bigquery): add GEOGRAPHY type support for BigQuery I/O Add support for BigQuery GEOGRAPHY type which works with Well-Known Text (WKT) format. The change includes: - Adding GEOGRAPHY to type mappings in bigquery_tools and bigquery_schema_tools - Implementing GeographyType logical type in schemas.py - Adding comprehensive tests for GEOGRAPHY type conversion and schema integration * fixed tests * tests * fixed tests * fixes language_type * fixed logical type * urns * add BQ IT * yapf * feat(bigquery): add project handling and test improvements - Add _get_project method to handle project billing in BigQuery source - Update tests to explicitly specify project parameter - Fix geography test data formats and simplify test cases - Add temporary storage location for file load tests * lint * format * removed GeographyType for now * restore schemas.py * added uses_gcp_java_expansion_service
1 parent 6562b5b commit 30fd958

File tree

6 files changed

+841
-4
lines changed

6 files changed

+841
-4
lines changed

sdks/python/apache_beam/io/gcp/bigquery.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,16 @@ def __init__(
10291029
self._step_name = step_name
10301030
self._source_uuid = unique_id
10311031

1032+
def _get_project(self):
1033+
"""Returns the project that queries and exports will be billed to."""
1034+
if self.pipeline_options:
1035+
project = self.pipeline_options.view_as(GoogleCloudOptions).project
1036+
if isinstance(project, vp.ValueProvider):
1037+
project = project.get()
1038+
if project:
1039+
return project
1040+
return self.project
1041+
10321042
def _get_parent_project(self):
10331043
"""Returns the project that will be billed."""
10341044
if self.temp_table:
@@ -1164,6 +1174,9 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
11641174
self._setup_temporary_dataset(bq)
11651175
self.table_reference = self._execute_query(bq)
11661176

1177+
if not self.table_reference.projectId:
1178+
self.table_reference.projectId = self._get_project()
1179+
11671180
requested_session = bq_storage.types.ReadSession()
11681181
requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format(
11691182
self.table_reference.projectId,

0 commit comments

Comments
 (0)