From 28652c5080129fe1cedfc20c36920fb0b0cf02ad Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 23 Oct 2025 17:30:21 -0400 Subject: [PATCH 1/6] support MicrosInstant conversion to Avro Timestamp --- .../avro/schemas/utils/AvroUtils.java | 9 +++++ .../avro/schemas/utils/AvroUtilsTest.java | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 460bfaec4a36..f4db58b9a93f 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -133,6 +133,8 @@ * LogicalTypes.Date <-----> LogicalType(DATE) * <------ LogicalType(urn="beam:logical_type:date:v1") * LogicalTypes.TimestampMillis <-----> DATETIME + * LogicalTypes.TimestampMicros ------> Long + * LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1") * LogicalTypes.Decimal <-----> DECIMAL * * @@ -1179,6 +1181,9 @@ private static org.apache.avro.Schema getFieldSchema( baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); } else if ("TIME".equals(identifier)) { baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); + } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { + baseType = + LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); } else { throw new RuntimeException( "Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier()); @@ -1315,6 +1320,10 @@ private static org.apache.avro.Schema getFieldSchema( return ((java.time.LocalDate) value).toEpochDay(); } else if ("TIME".equals(identifier)) { return (int) ((Instant) value).getMillis(); + } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { + java.time.Instant instant = (java.time.Instant) value; + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); } else { throw new RuntimeException("Unhandled logical type " + identifier); } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 7cda1e9dba5a..41a43ed850b7 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -1038,6 +1039,39 @@ public void testAvroBytesToRowAndRowToAvroBytesFunctions() { assertEquals(row, deserializedRow); } + @Test + public void testBeamTimestampLogicalTypeToAvro() { + // Tests special handling for Beam's MicrosInstant logical type + // Only one way (Beam to Avro) + + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampMicrosLT", SqlTypes.TIMESTAMP).build(); + List fields = Lists.newArrayList(); + fields.add( + new org.apache.avro.Schema.Field( + "timestampMicrosLT", + LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)), + "", + (Object) null)); + org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields); + + assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema)); + + java.time.Instant instant = + java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()).plusNanos(123000); + Row beamRow = Row.withSchema(beamSchema).addValue(instant).build(); + GenericRecord avroRecord = + new GenericRecordBuilder(avroSchema) + .set( + "timestampMicrosLT", + TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano())) + .build(); + + assertEquals(avroRecord, AvroUtils.toGenericRecord(beamRow)); + } + @Test public void testNullSchemas() { assertEquals( From bc5f1de01d15ea603d6d65a25c54ffbd8fd0be22 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 19 Nov 2025 11:57:10 -0500 Subject: [PATCH 2/6] add test --- .../io/external/xlang_bigqueryio_it_test.py | 612 +++++++++--------- 1 file changed, 321 insertions(+), 291 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 51ae97b99175..e6c4f18de5d7 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -58,55 +58,55 @@ @pytest.mark.uses_gcp_java_expansion_service -@unittest.skipUnless( - os.environ.get('EXPANSION_JARS'), - "EXPANSION_JARS environment var is not provided, " - "indicating that jars have not been built") +# @unittest.skipUnless( +# os.environ.get('EXPANSION_JARS'), +# "EXPANSION_JARS environment var is not provided, " +# "indicating that jars have not been built") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' ELEMENTS = [ - # (int, float, numeric, string, bool, bytes, timestamp) - { - "int": 1, - "float": 0.1, - "numeric": Decimal("1.11"), - "str": "a", - "bool": True, - "bytes": b'a', - "timestamp": Timestamp(1000, 100) - }, - { - "int": 2, - "float": 0.2, - "numeric": Decimal("2.22"), - "str": "b", - "bool": False, - "bytes": b'b', - "timestamp": Timestamp(2000, 200) - }, - { - "int": 3, - "float": 0.3, - "numeric": Decimal("3.33"), - "str": "c", - "bool": True, - "bytes": b'd', - "timestamp": Timestamp(3000, 300) - }, - { - "int": 4, - "float": 0.4, - "numeric": Decimal("4.44"), - "str": "d", - "bool": False, - "bytes": b'd', - "timestamp": Timestamp(4000, 400) - } + # (int, float, numeric, string, bool, bytes, timestamp) + { + "int": 1, + "float": 0.1, + "numeric": Decimal("1.11"), + "str": "a", + "bool": True, + "bytes": b'a', + "timestamp": Timestamp(1000, 100) + }, + { + "int": 2, + "float": 0.2, + "numeric": Decimal("2.22"), + "str": "b", + "bool": False, + "bytes": b'b', + "timestamp": Timestamp(2000, 200) + }, + { + "int": 3, + "float": 0.3, + "numeric": Decimal("3.33"), + "str": "c", + "bool": True, + "bytes": b'd', + "timestamp": Timestamp(3000, 300) + }, + { + "int": 4, + "float": 0.4, + "numeric": Decimal("4.44"), + "str": "d", + "bool": False, + "bytes": b'd', + "timestamp": Timestamp(4000, 400) + } ] ALL_TYPES_SCHEMA = ( - "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING," - "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP") + "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING," + "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP") def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) @@ -114,26 +114,26 @@ def setUp(self): self.project = self.test_pipeline.get_option('project') self._runner = PipelineOptions(self.args).get_all_options()['runner'] - self.bigquery_client = BigQueryWrapper() + self.bigquery_client = BigQueryWrapper.from_pipeline_options(self.test_pipeline.options) self.dataset_id = '%s_%s_%s' % ( - self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3)) + self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3)) self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) _LOGGER.info( - "Created dataset %s in project %s", self.dataset_id, self.project) + "Created dataset %s in project %s", self.dataset_id, self.project) def tearDown(self): try: _LOGGER.info( - "Deleting dataset %s in project %s", self.dataset_id, self.project) + "Deleting dataset %s in project %s", self.dataset_id, self.project) self.bigquery_client._delete_dataset( - project_id=self.project, - dataset_id=self.dataset_id, - delete_contents=True) + project_id=self.project, + dataset_id=self.dataset_id, + delete_contents=True) except HttpError: _LOGGER.debug( - 'Failed to clean up dataset %s in project %s', - self.dataset_id, - self.project) + 'Failed to clean up dataset %s in project %s', + self.dataset_id, + self.project) def parse_expected_data(self, expected_elements): if not isinstance(expected_elements, list): @@ -145,16 +145,16 @@ def parse_expected_data(self, expected_elements): if isinstance(val, Timestamp): # BigQuery matcher query returns a datetime.datetime object values[i] = val.to_utc_datetime().replace( - tzinfo=datetime.timezone.utc) + tzinfo=datetime.timezone.utc) data.append(tuple(values)) return data def assert_iceberg_tables_created( - self, table_prefix, storage_uri, expected_count=1): + self, table_prefix, storage_uri, expected_count=1): """Verify that Iceberg table directories are created in the warehouse location. - + Args: table_prefix: The table name prefix to look for storage_uri: The GCS storage URI (e.g., 'gs://bucket/path') @@ -162,7 +162,7 @@ def assert_iceberg_tables_created( """ if GcsIO is None: _LOGGER.warning( - "GcsIO not available, skipping warehouse location verification") + "GcsIO not available, skipping warehouse location verification") return gcs_io = GcsIO() @@ -180,8 +180,8 @@ def assert_iceberg_tables_created( # Following the pattern: # {base_prefix}/{project}/{dataset}/{table_prefix} search_prefix = ( - f"{base_prefix}/" - f"{self.project}/{self.dataset_id}/{table_prefix}") + f"{base_prefix}/" + f"{self.project}/{self.dataset_id}/{table_prefix}") # List objects in the bucket with the constructed prefix try: @@ -190,40 +190,40 @@ def assert_iceberg_tables_created( if object_count < expected_count: raise AssertionError( - f"Expected at least {expected_count} objects in warehouse " - f"location gs://{bucket_name}/{search_prefix}, but found " - f"{object_count}") + f"Expected at least {expected_count} objects in warehouse " + f"location gs://{bucket_name}/{search_prefix}, but found " + f"{object_count}") _LOGGER.info( - "Successfully verified %s objects created in " - "warehouse location gs://%s/%s", - object_count, - bucket_name, - search_prefix) + "Successfully verified %s objects created in " + "warehouse location gs://%s/%s", + object_count, + bucket_name, + search_prefix) except Exception as e: raise AssertionError( - f"Failed to verify table creation in warehouse location " - f"gs://{bucket_name}/{search_prefix}: {str(e)}") + f"Failed to verify table creation in warehouse location " + f"gs://{bucket_name}/{search_prefix}: {str(e)}") def run_storage_write_test( - self, table_name, items, schema, use_at_least_once=False): + self, table_name, items, schema, use_at_least_once=False): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name) bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM %s" % '{}.{}'.format(self.dataset_id, table_name), - data=self.parse_expected_data(items)) + project=self.project, + query="SELECT * FROM %s" % '{}.{}'.format(self.dataset_id, table_name), + data=self.parse_expected_data(items)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(items) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=schema, - use_at_least_once=use_at_least_once)) + p + | beam.Create(items) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=schema, + use_at_least_once=use_at_least_once)) hamcrest_assert(p, bq_matcher) def test_all_types(self): @@ -235,46 +235,46 @@ def test_with_at_least_once_semantics(self): table_name = "with_at_least_once_semantics" schema = self.ALL_TYPES_SCHEMA self.run_storage_write_test( - table_name, self.ELEMENTS, schema, use_at_least_once=True) + table_name, self.ELEMENTS, schema, use_at_least_once=True) def test_nested_records_and_lists(self): table_name = "nested_records_and_lists" schema = { - "fields": [{ - "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED" + "fields": [{ + "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED" + }, + { + "name": "struct", + "type": "STRUCT", + "fields": [{ + "name": "nested_int", "type": "INTEGER" + }, { + "name": "nested_str", "type": "STRING" + }] }, - { - "name": "struct", - "type": "STRUCT", - "fields": [{ - "name": "nested_int", "type": "INTEGER" - }, { - "name": "nested_str", "type": "STRING" - }] - }, - { - "name": "repeated_struct", - "type": "STRUCT", - "mode": "REPEATED", - "fields": [{ - "name": "nested_numeric", "type": "NUMERIC" - }, { - "name": "nested_bytes", "type": "BYTES" - }] - }] + { + "name": "repeated_struct", + "type": "STRUCT", + "mode": "REPEATED", + "fields": [{ + "name": "nested_numeric", "type": "NUMERIC" + }, { + "name": "nested_bytes", "type": "BYTES" + }] + }] } items = [{ - "repeated_int": [1, 2, 3], - "struct": { - "nested_int": 1, "nested_str": "a" - }, - "repeated_struct": [{ - "nested_numeric": Decimal("1.23"), "nested_bytes": b'a' - }, - { - "nested_numeric": Decimal("3.21"), - "nested_bytes": b'aa' - }] + "repeated_int": [1, 2, 3], + "struct": { + "nested_int": 1, "nested_str": "a" + }, + "repeated_struct": [{ + "nested_numeric": Decimal("1.23"), "nested_bytes": b'a' + }, + { + "nested_numeric": Decimal("3.21"), + "nested_bytes": b'aa' + }] }] self.run_storage_write_test(table_name, items, schema) @@ -284,26 +284,26 @@ def test_write_with_beam_rows(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) row_elements = [ - beam.Row( - my_int=e['int'], - my_float=e['float'], - my_numeric=e['numeric'], - my_string=e['str'], - my_bool=e['bool'], - my_bytes=e['bytes'], - my_timestamp=e['timestamp']) for e in self.ELEMENTS + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_numeric=e['numeric'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS ] bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(self.ELEMENTS)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(row_elements) - | StorageWriteToBigQuery(table=table_id)) + p + | beam.Create(row_elements) + | StorageWriteToBigQuery(table=table_id)) hamcrest_assert(p, bq_matcher) def test_write_with_clustering(self): @@ -311,23 +311,23 @@ def test_write_with_clustering(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(self.ELEMENTS)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | "Create test data" >> beam.Create(self.ELEMENTS) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=self.ALL_TYPES_SCHEMA, - create_disposition='CREATE_IF_NEEDED', - write_disposition='WRITE_TRUNCATE', - additional_bq_parameters={'clustering': { - 'fields': ['int'] - }})) + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + additional_bq_parameters={'clustering': { + 'fields': ['int'] + }})) # After pipeline finishes, verify clustering is applied table = self.bigquery_client.get_table(self.project, self.dataset_id, table) @@ -341,39 +341,39 @@ def test_write_with_beam_rows_cdc(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) expected_data_on_bq = [ - # (name, value) - { - "name": "cdc_test", - "value": 5, - } + # (name, value) + { + "name": "cdc_test", + "value": 5, + } ] rows_with_cdc = [ - beam.Row( - row_mutation_info=beam.Row( - mutation_type="UPSERT", change_sequence_number="AAA/2"), - record=beam.Row(name="cdc_test", value=5)), - beam.Row( - row_mutation_info=beam.Row( - mutation_type="UPSERT", change_sequence_number="AAA/1"), - record=beam.Row(name="cdc_test", value=3)) + beam.Row( + row_mutation_info=beam.Row( + mutation_type="UPSERT", change_sequence_number="AAA/2"), + record=beam.Row(name="cdc_test", value=5)), + beam.Row( + row_mutation_info=beam.Row( + mutation_type="UPSERT", change_sequence_number="AAA/1"), + record=beam.Row(name="cdc_test", value=3)) ] bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected_data_on_bq)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected_data_on_bq)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(rows_with_cdc) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=True, - use_cdc_writes=True, - primary_key=["name"])) + p + | beam.Create(rows_with_cdc) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=True, + use_cdc_writes=True, + primary_key=["name"])) hamcrest_assert(p, bq_matcher) def test_write_with_dicts_cdc(self): @@ -381,81 +381,81 @@ def test_write_with_dicts_cdc(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) expected_data_on_bq = [ - # (name, value) - { - "name": "cdc_test", - "value": 5, - } + # (name, value) + { + "name": "cdc_test", + "value": 5, + } ] data_with_cdc = [ - # record: (name, value) - { - 'row_mutation_info': { - 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/2' - }, - 'record': { - 'name': 'cdc_test', 'value': 5 - } + # record: (name, value) + { + 'row_mutation_info': { + 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/2' }, - { - 'row_mutation_info': { - 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/1' - }, - 'record': { - 'name': 'cdc_test', 'value': 3 - } + 'record': { + 'name': 'cdc_test', 'value': 5 + } + }, + { + 'row_mutation_info': { + 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/1' + }, + 'record': { + 'name': 'cdc_test', 'value': 3 } + } ] schema = { - "fields": [ - # include both record and mutation info fields as part of the schema + "fields": [ + # include both record and mutation info fields as part of the schema + { + "name": "row_mutation_info", + "type": "STRUCT", + "fields": [ + # setting both fields are required { - "name": "row_mutation_info", - "type": "STRUCT", - "fields": [ - # setting both fields are required - { - "name": "mutation_type", - "type": "STRING", - "mode": "REQUIRED" - }, - { - "name": "change_sequence_number", - "type": "STRING", - "mode": "REQUIRED" - } - ] + "name": "mutation_type", + "type": "STRING", + "mode": "REQUIRED" }, { - "name": "record", - "type": "STRUCT", - "fields": [{ - "name": "name", "type": "STRING" - }, { - "name": "value", "type": "INTEGER" - }] + "name": "change_sequence_number", + "type": "STRING", + "mode": "REQUIRED" } - ] + ] + }, + { + "name": "record", + "type": "STRUCT", + "fields": [{ + "name": "name", "type": "STRING" + }, { + "name": "value", "type": "INTEGER" + }] + } + ] } bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected_data_on_bq)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected_data_on_bq)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(data_with_cdc) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=True, - use_cdc_writes=True, - schema=schema, - primary_key=["name"])) + p + | beam.Create(data_with_cdc) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=True, + use_cdc_writes=True, + schema=schema, + primary_key=["name"])) hamcrest_assert(p, bq_matcher) def test_write_to_dynamic_destinations(self): @@ -464,22 +464,22 @@ def test_write_to_dynamic_destinations(self): tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] bq_matchers = [ - BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM %s" % tables[i], - data=self.parse_expected_data(self.ELEMENTS[i])) - for i in range(len(tables)) + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) ] with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(self.ELEMENTS) - | beam.io.WriteToBigQuery( - table=lambda record: spec_with_project + str(record['int']), - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=self.ALL_TYPES_SCHEMA, - use_at_least_once=False)) + p + | beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record['int']), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) def test_write_to_dynamic_destinations_with_beam_rows(self): @@ -488,32 +488,32 @@ def test_write_to_dynamic_destinations_with_beam_rows(self): tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] bq_matchers = [ - BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM %s" % tables[i], - data=self.parse_expected_data(self.ELEMENTS[i])) - for i in range(len(tables)) + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) ] row_elements = [ - beam.Row( - my_int=e['int'], - my_float=e['float'], - my_numeric=e['numeric'], - my_string=e['str'], - my_bool=e['bool'], - my_bytes=e['bytes'], - my_timestamp=e['timestamp']) for e in self.ELEMENTS + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_numeric=e['numeric'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS ] with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(row_elements) - | beam.io.WriteToBigQuery( - table=lambda record: spec_with_project + str(record.my_int), - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=False)) + p + | beam.Create(row_elements) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record.my_int), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): @@ -522,38 +522,38 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name) bq_matcher = BigqueryFullResultStreamingMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table_name), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table_name), + data=self.parse_expected_data(self.ELEMENTS)) args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=bq_matcher, - streaming=True, - allow_unsafe_triggers=True) + on_success_matcher=bq_matcher, + streaming=True, + allow_unsafe_triggers=True) auto_sharding = (num_streams == 0) with beam.Pipeline(argv=args) as p: _ = ( - p - | PeriodicImpulse(0, 4, 1) - | beam.Map(lambda t: elements[t]) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=schema, - triggering_frequency=1, - with_auto_sharding=auto_sharding, - num_storage_api_streams=num_streams, - use_at_least_once=use_at_least_once)) + p + | PeriodicImpulse(0, 4, 1) + | beam.Map(lambda t: elements[t]) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=schema, + triggering_frequency=1, + with_auto_sharding=auto_sharding, + num_storage_api_streams=num_streams, + use_at_least_once=use_at_least_once)) hamcrest_assert(p, bq_matcher) def skip_if_not_dataflow_runner(self) -> bool: # skip if dataflow runner is not specified if not self._runner or "dataflowrunner" not in self._runner.lower(): self.skipTest( - "Streaming with exactly-once route has the requirement " - "`beam:requirement:pardo:on_window_expiration:v1`, " - "which is currently only supported by the Dataflow runner") + "Streaming with exactly-once route has the requirement " + "`beam:requirement:pardo:on_window_expiration:v1`, " + "which is currently only supported by the Dataflow runner") def test_streaming_with_fixed_num_streams(self): self.skip_if_not_dataflow_runner() @@ -561,8 +561,8 @@ def test_streaming_with_fixed_num_streams(self): self.run_streaming(table_name=table, num_streams=4) @unittest.skip( - "Streaming to the Storage Write API sink with autosharding is broken " - "with Dataflow Runner V2.") + "Streaming to the Storage Write API sink with autosharding is broken " + "with Dataflow Runner V2.") def test_streaming_with_auto_sharding(self): self.skip_if_not_dataflow_runner() table = 'streaming_with_auto_sharding' @@ -579,34 +579,64 @@ def test_write_with_big_lake_configuration(self): # BigLake configuration with required parameters (matching Java test) big_lake_config = { - 'connectionId': 'projects/apache-beam-testing/locations/us/connections/apache-beam-testing-storageapi-biglake-nodelete', # pylint: disable=line-too-long - 'storageUri': 'gs://apache-beam-testing-bq-biglake/BigQueryXlangStorageWriteIT', # pylint: disable=line-too-long - 'fileFormat': 'parquet', - 'tableFormat': 'iceberg' + 'connectionId': 'projects/apache-beam-testing/locations/us/connections/apache-beam-testing-storageapi-biglake-nodelete', # pylint: disable=line-too-long + 'storageUri': 'gs://apache-beam-testing-bq-biglake/BigQueryXlangStorageWriteIT', # pylint: disable=line-too-long + 'fileFormat': 'parquet', + 'tableFormat': 'iceberg' } bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(self.ELEMENTS)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | "Create test data" >> beam.Create(self.ELEMENTS) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=self.ALL_TYPES_SCHEMA, - create_disposition='CREATE_IF_NEEDED', - write_disposition='WRITE_TRUNCATE', - big_lake_configuration=big_lake_config)) + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + big_lake_configuration=big_lake_config)) hamcrest_assert(p, bq_matcher) # Verify that the table directory was created in the warehouse location self.assert_iceberg_tables_created(table, big_lake_config['storageUri']) + def test_write_with_managed_transform(self): + table = 'write_with_managed_transform' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + + row_elements = [ + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS + ] + + expected = [] + for e in self.ELEMENTS: + del e["numeric"] + expected.append(e) + bq_matcher = BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected)) + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(row_elements) + | beam.managed.Write("bigquery", config={"table": table_id})) + hamcrest_assert(p, bq_matcher) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 6d2ae91d31c94b1e4590dc5927dc477c4a8a3610 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 19 Nov 2025 11:57:52 -0500 Subject: [PATCH 3/6] style --- .../io/external/xlang_bigqueryio_it_test.py | 591 +++++++++--------- 1 file changed, 296 insertions(+), 295 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index e6c4f18de5d7..c817244d96d1 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -66,47 +66,47 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' ELEMENTS = [ - # (int, float, numeric, string, bool, bytes, timestamp) - { - "int": 1, - "float": 0.1, - "numeric": Decimal("1.11"), - "str": "a", - "bool": True, - "bytes": b'a', - "timestamp": Timestamp(1000, 100) - }, - { - "int": 2, - "float": 0.2, - "numeric": Decimal("2.22"), - "str": "b", - "bool": False, - "bytes": b'b', - "timestamp": Timestamp(2000, 200) - }, - { - "int": 3, - "float": 0.3, - "numeric": Decimal("3.33"), - "str": "c", - "bool": True, - "bytes": b'd', - "timestamp": Timestamp(3000, 300) - }, - { - "int": 4, - "float": 0.4, - "numeric": Decimal("4.44"), - "str": "d", - "bool": False, - "bytes": b'd', - "timestamp": Timestamp(4000, 400) - } + # (int, float, numeric, string, bool, bytes, timestamp) + { + "int": 1, + "float": 0.1, + "numeric": Decimal("1.11"), + "str": "a", + "bool": True, + "bytes": b'a', + "timestamp": Timestamp(1000, 100) + }, + { + "int": 2, + "float": 0.2, + "numeric": Decimal("2.22"), + "str": "b", + "bool": False, + "bytes": b'b', + "timestamp": Timestamp(2000, 200) + }, + { + "int": 3, + "float": 0.3, + "numeric": Decimal("3.33"), + "str": "c", + "bool": True, + "bytes": b'd', + "timestamp": Timestamp(3000, 300) + }, + { + "int": 4, + "float": 0.4, + "numeric": Decimal("4.44"), + "str": "d", + "bool": False, + "bytes": b'd', + "timestamp": Timestamp(4000, 400) + } ] ALL_TYPES_SCHEMA = ( - "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING," - "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP") + "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING," + "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP") def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) @@ -114,26 +114,27 @@ def setUp(self): self.project = self.test_pipeline.get_option('project') self._runner = PipelineOptions(self.args).get_all_options()['runner'] - self.bigquery_client = BigQueryWrapper.from_pipeline_options(self.test_pipeline.options) + self.bigquery_client = BigQueryWrapper.from_pipeline_options( + self.test_pipeline.options) self.dataset_id = '%s_%s_%s' % ( - self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3)) + self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3)) self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) _LOGGER.info( - "Created dataset %s in project %s", self.dataset_id, self.project) + "Created dataset %s in project %s", self.dataset_id, self.project) def tearDown(self): try: _LOGGER.info( - "Deleting dataset %s in project %s", self.dataset_id, self.project) + "Deleting dataset %s in project %s", self.dataset_id, self.project) self.bigquery_client._delete_dataset( - project_id=self.project, - dataset_id=self.dataset_id, - delete_contents=True) + project_id=self.project, + dataset_id=self.dataset_id, + delete_contents=True) except HttpError: _LOGGER.debug( - 'Failed to clean up dataset %s in project %s', - self.dataset_id, - self.project) + 'Failed to clean up dataset %s in project %s', + self.dataset_id, + self.project) def parse_expected_data(self, expected_elements): if not isinstance(expected_elements, list): @@ -145,13 +146,13 @@ def parse_expected_data(self, expected_elements): if isinstance(val, Timestamp): # BigQuery matcher query returns a datetime.datetime object values[i] = val.to_utc_datetime().replace( - tzinfo=datetime.timezone.utc) + tzinfo=datetime.timezone.utc) data.append(tuple(values)) return data def assert_iceberg_tables_created( - self, table_prefix, storage_uri, expected_count=1): + self, table_prefix, storage_uri, expected_count=1): """Verify that Iceberg table directories are created in the warehouse location. @@ -162,7 +163,7 @@ def assert_iceberg_tables_created( """ if GcsIO is None: _LOGGER.warning( - "GcsIO not available, skipping warehouse location verification") + "GcsIO not available, skipping warehouse location verification") return gcs_io = GcsIO() @@ -180,8 +181,8 @@ def assert_iceberg_tables_created( # Following the pattern: # {base_prefix}/{project}/{dataset}/{table_prefix} search_prefix = ( - f"{base_prefix}/" - f"{self.project}/{self.dataset_id}/{table_prefix}") + f"{base_prefix}/" + f"{self.project}/{self.dataset_id}/{table_prefix}") # List objects in the bucket with the constructed prefix try: @@ -190,40 +191,40 @@ def assert_iceberg_tables_created( if object_count < expected_count: raise AssertionError( - f"Expected at least {expected_count} objects in warehouse " - f"location gs://{bucket_name}/{search_prefix}, but found " - f"{object_count}") + f"Expected at least {expected_count} objects in warehouse " + f"location gs://{bucket_name}/{search_prefix}, but found " + f"{object_count}") _LOGGER.info( - "Successfully verified %s objects created in " - "warehouse location gs://%s/%s", - object_count, - bucket_name, - search_prefix) + "Successfully verified %s objects created in " + "warehouse location gs://%s/%s", + object_count, + bucket_name, + search_prefix) except Exception as e: raise AssertionError( - f"Failed to verify table creation in warehouse location " - f"gs://{bucket_name}/{search_prefix}: {str(e)}") + f"Failed to verify table creation in warehouse location " + f"gs://{bucket_name}/{search_prefix}: {str(e)}") def run_storage_write_test( - self, table_name, items, schema, use_at_least_once=False): + self, table_name, items, schema, use_at_least_once=False): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name) bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM %s" % '{}.{}'.format(self.dataset_id, table_name), - data=self.parse_expected_data(items)) + project=self.project, + query="SELECT * FROM %s" % '{}.{}'.format(self.dataset_id, table_name), + data=self.parse_expected_data(items)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(items) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=schema, - use_at_least_once=use_at_least_once)) + p + | beam.Create(items) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=schema, + use_at_least_once=use_at_least_once)) hamcrest_assert(p, bq_matcher) def test_all_types(self): @@ -235,46 +236,46 @@ def test_with_at_least_once_semantics(self): table_name = "with_at_least_once_semantics" schema = self.ALL_TYPES_SCHEMA self.run_storage_write_test( - table_name, self.ELEMENTS, schema, use_at_least_once=True) + table_name, self.ELEMENTS, schema, use_at_least_once=True) def test_nested_records_and_lists(self): table_name = "nested_records_and_lists" schema = { - "fields": [{ - "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED" - }, - { - "name": "struct", - "type": "STRUCT", - "fields": [{ - "name": "nested_int", "type": "INTEGER" - }, { - "name": "nested_str", "type": "STRING" - }] + "fields": [{ + "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED" }, - { - "name": "repeated_struct", - "type": "STRUCT", - "mode": "REPEATED", - "fields": [{ - "name": "nested_numeric", "type": "NUMERIC" - }, { - "name": "nested_bytes", "type": "BYTES" - }] - }] + { + "name": "struct", + "type": "STRUCT", + "fields": [{ + "name": "nested_int", "type": "INTEGER" + }, { + "name": "nested_str", "type": "STRING" + }] + }, + { + "name": "repeated_struct", + "type": "STRUCT", + "mode": "REPEATED", + "fields": [{ + "name": "nested_numeric", "type": "NUMERIC" + }, { + "name": "nested_bytes", "type": "BYTES" + }] + }] } items = [{ - "repeated_int": [1, 2, 3], - "struct": { - "nested_int": 1, "nested_str": "a" - }, - "repeated_struct": [{ - "nested_numeric": Decimal("1.23"), "nested_bytes": b'a' - }, - { - "nested_numeric": Decimal("3.21"), - "nested_bytes": b'aa' - }] + "repeated_int": [1, 2, 3], + "struct": { + "nested_int": 1, "nested_str": "a" + }, + "repeated_struct": [{ + "nested_numeric": Decimal("1.23"), "nested_bytes": b'a' + }, + { + "nested_numeric": Decimal("3.21"), + "nested_bytes": b'aa' + }] }] self.run_storage_write_test(table_name, items, schema) @@ -284,26 +285,26 @@ def test_write_with_beam_rows(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) row_elements = [ - beam.Row( - my_int=e['int'], - my_float=e['float'], - my_numeric=e['numeric'], - my_string=e['str'], - my_bool=e['bool'], - my_bytes=e['bytes'], - my_timestamp=e['timestamp']) for e in self.ELEMENTS + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_numeric=e['numeric'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS ] bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(self.ELEMENTS)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(row_elements) - | StorageWriteToBigQuery(table=table_id)) + p + | beam.Create(row_elements) + | StorageWriteToBigQuery(table=table_id)) hamcrest_assert(p, bq_matcher) def test_write_with_clustering(self): @@ -311,23 +312,23 @@ def test_write_with_clustering(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(self.ELEMENTS)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | "Create test data" >> beam.Create(self.ELEMENTS) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=self.ALL_TYPES_SCHEMA, - create_disposition='CREATE_IF_NEEDED', - write_disposition='WRITE_TRUNCATE', - additional_bq_parameters={'clustering': { - 'fields': ['int'] - }})) + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + additional_bq_parameters={'clustering': { + 'fields': ['int'] + }})) # After pipeline finishes, verify clustering is applied table = self.bigquery_client.get_table(self.project, self.dataset_id, table) @@ -341,39 +342,39 @@ def test_write_with_beam_rows_cdc(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) expected_data_on_bq = [ - # (name, value) - { - "name": "cdc_test", - "value": 5, - } + # (name, value) + { + "name": "cdc_test", + "value": 5, + } ] rows_with_cdc = [ - beam.Row( - row_mutation_info=beam.Row( - mutation_type="UPSERT", change_sequence_number="AAA/2"), - record=beam.Row(name="cdc_test", value=5)), - beam.Row( - row_mutation_info=beam.Row( - mutation_type="UPSERT", change_sequence_number="AAA/1"), - record=beam.Row(name="cdc_test", value=3)) + beam.Row( + row_mutation_info=beam.Row( + mutation_type="UPSERT", change_sequence_number="AAA/2"), + record=beam.Row(name="cdc_test", value=5)), + beam.Row( + row_mutation_info=beam.Row( + mutation_type="UPSERT", change_sequence_number="AAA/1"), + record=beam.Row(name="cdc_test", value=3)) ] bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected_data_on_bq)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected_data_on_bq)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(rows_with_cdc) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=True, - use_cdc_writes=True, - primary_key=["name"])) + p + | beam.Create(rows_with_cdc) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=True, + use_cdc_writes=True, + primary_key=["name"])) hamcrest_assert(p, bq_matcher) def test_write_with_dicts_cdc(self): @@ -381,81 +382,81 @@ def test_write_with_dicts_cdc(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) expected_data_on_bq = [ - # (name, value) - { - "name": "cdc_test", - "value": 5, - } + # (name, value) + { + "name": "cdc_test", + "value": 5, + } ] data_with_cdc = [ - # record: (name, value) - { - 'row_mutation_info': { - 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/2' - }, - 'record': { - 'name': 'cdc_test', 'value': 5 - } - }, - { - 'row_mutation_info': { - 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/1' + # record: (name, value) + { + 'row_mutation_info': { + 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/2' + }, + 'record': { + 'name': 'cdc_test', 'value': 5 + } }, - 'record': { - 'name': 'cdc_test', 'value': 3 + { + 'row_mutation_info': { + 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/1' + }, + 'record': { + 'name': 'cdc_test', 'value': 3 + } } - } ] schema = { - "fields": [ - # include both record and mutation info fields as part of the schema - { - "name": "row_mutation_info", - "type": "STRUCT", - "fields": [ - # setting both fields are required + "fields": [ + # include both record and mutation info fields as part of the schema { - "name": "mutation_type", - "type": "STRING", - "mode": "REQUIRED" + "name": "row_mutation_info", + "type": "STRUCT", + "fields": [ + # setting both fields are required + { + "name": "mutation_type", + "type": "STRING", + "mode": "REQUIRED" + }, + { + "name": "change_sequence_number", + "type": "STRING", + "mode": "REQUIRED" + } + ] }, { - "name": "change_sequence_number", - "type": "STRING", - "mode": "REQUIRED" + "name": "record", + "type": "STRUCT", + "fields": [{ + "name": "name", "type": "STRING" + }, { + "name": "value", "type": "INTEGER" + }] } - ] - }, - { - "name": "record", - "type": "STRUCT", - "fields": [{ - "name": "name", "type": "STRING" - }, { - "name": "value", "type": "INTEGER" - }] - } - ] + ] } bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected_data_on_bq)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected_data_on_bq)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(data_with_cdc) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=True, - use_cdc_writes=True, - schema=schema, - primary_key=["name"])) + p + | beam.Create(data_with_cdc) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=True, + use_cdc_writes=True, + schema=schema, + primary_key=["name"])) hamcrest_assert(p, bq_matcher) def test_write_to_dynamic_destinations(self): @@ -464,22 +465,22 @@ def test_write_to_dynamic_destinations(self): tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] bq_matchers = [ - BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM %s" % tables[i], - data=self.parse_expected_data(self.ELEMENTS[i])) - for i in range(len(tables)) + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) ] with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(self.ELEMENTS) - | beam.io.WriteToBigQuery( - table=lambda record: spec_with_project + str(record['int']), - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=self.ALL_TYPES_SCHEMA, - use_at_least_once=False)) + p + | beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record['int']), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) def test_write_to_dynamic_destinations_with_beam_rows(self): @@ -488,32 +489,32 @@ def test_write_to_dynamic_destinations_with_beam_rows(self): tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] bq_matchers = [ - BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM %s" % tables[i], - data=self.parse_expected_data(self.ELEMENTS[i])) - for i in range(len(tables)) + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) ] row_elements = [ - beam.Row( - my_int=e['int'], - my_float=e['float'], - my_numeric=e['numeric'], - my_string=e['str'], - my_bool=e['bool'], - my_bytes=e['bytes'], - my_timestamp=e['timestamp']) for e in self.ELEMENTS + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_numeric=e['numeric'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS ] with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(row_elements) - | beam.io.WriteToBigQuery( - table=lambda record: spec_with_project + str(record.my_int), - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=False)) + p + | beam.Create(row_elements) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record.my_int), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): @@ -522,38 +523,38 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name) bq_matcher = BigqueryFullResultStreamingMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table_name), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table_name), + data=self.parse_expected_data(self.ELEMENTS)) args = self.test_pipeline.get_full_options_as_args( - on_success_matcher=bq_matcher, - streaming=True, - allow_unsafe_triggers=True) + on_success_matcher=bq_matcher, + streaming=True, + allow_unsafe_triggers=True) auto_sharding = (num_streams == 0) with beam.Pipeline(argv=args) as p: _ = ( - p - | PeriodicImpulse(0, 4, 1) - | beam.Map(lambda t: elements[t]) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=schema, - triggering_frequency=1, - with_auto_sharding=auto_sharding, - num_storage_api_streams=num_streams, - use_at_least_once=use_at_least_once)) + p + | PeriodicImpulse(0, 4, 1) + | beam.Map(lambda t: elements[t]) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=schema, + triggering_frequency=1, + with_auto_sharding=auto_sharding, + num_storage_api_streams=num_streams, + use_at_least_once=use_at_least_once)) hamcrest_assert(p, bq_matcher) def skip_if_not_dataflow_runner(self) -> bool: # skip if dataflow runner is not specified if not self._runner or "dataflowrunner" not in self._runner.lower(): self.skipTest( - "Streaming with exactly-once route has the requirement " - "`beam:requirement:pardo:on_window_expiration:v1`, " - "which is currently only supported by the Dataflow runner") + "Streaming with exactly-once route has the requirement " + "`beam:requirement:pardo:on_window_expiration:v1`, " + "which is currently only supported by the Dataflow runner") def test_streaming_with_fixed_num_streams(self): self.skip_if_not_dataflow_runner() @@ -561,8 +562,8 @@ def test_streaming_with_fixed_num_streams(self): self.run_streaming(table_name=table, num_streams=4) @unittest.skip( - "Streaming to the Storage Write API sink with autosharding is broken " - "with Dataflow Runner V2.") + "Streaming to the Storage Write API sink with autosharding is broken " + "with Dataflow Runner V2.") def test_streaming_with_auto_sharding(self): self.skip_if_not_dataflow_runner() table = 'streaming_with_auto_sharding' @@ -586,21 +587,21 @@ def test_write_with_big_lake_configuration(self): } bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(self.ELEMENTS)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(self.ELEMENTS)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | "Create test data" >> beam.Create(self.ELEMENTS) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - schema=self.ALL_TYPES_SCHEMA, - create_disposition='CREATE_IF_NEEDED', - write_disposition='WRITE_TRUNCATE', - big_lake_configuration=big_lake_config)) + p + | "Create test data" >> beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + create_disposition='CREATE_IF_NEEDED', + write_disposition='WRITE_TRUNCATE', + big_lake_configuration=big_lake_config)) hamcrest_assert(p, bq_matcher) @@ -612,13 +613,13 @@ def test_write_with_managed_transform(self): table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) row_elements = [ - beam.Row( - my_int=e['int'], - my_float=e['float'], - my_string=e['str'], - my_bool=e['bool'], - my_bytes=e['bytes'], - my_timestamp=e['timestamp']) for e in self.ELEMENTS + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS ] expected = [] @@ -626,15 +627,15 @@ def test_write_with_managed_transform(self): del e["numeric"] expected.append(e) bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected)) + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected)) with beam.Pipeline(argv=self.args) as p: _ = ( - p - | beam.Create(row_elements) - | beam.managed.Write("bigquery", config={"table": table_id})) + p + | beam.Create(row_elements) + | beam.managed.Write("bigquery", config={"table": table_id})) hamcrest_assert(p, bq_matcher) From e70c217a70915b3b05b8d44273eca4144a5fb85d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 19 Nov 2025 11:58:19 -0500 Subject: [PATCH 4/6] skip if no expansion jars --- .../apache_beam/io/external/xlang_bigqueryio_it_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index c817244d96d1..cfb19c5d79df 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -58,10 +58,10 @@ @pytest.mark.uses_gcp_java_expansion_service -# @unittest.skipUnless( -# os.environ.get('EXPANSION_JARS'), -# "EXPANSION_JARS environment var is not provided, " -# "indicating that jars have not been built") +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' From fe171e6d17490a24b79ec142451f49999a71f092 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 19 Nov 2025 11:58:59 -0500 Subject: [PATCH 5/6] trigger ITs --- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 2504db607e46..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 12 + "modification": 1 } From 2aaedf34494babe514fbd4471250e44cb16f41dc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 19 Nov 2025 12:23:39 -0500 Subject: [PATCH 6/6] style --- .../apache_beam/io/external/xlang_bigqueryio_it_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index cfb19c5d79df..d659d57aad90 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -580,10 +580,10 @@ def test_write_with_big_lake_configuration(self): # BigLake configuration with required parameters (matching Java test) big_lake_config = { - 'connectionId': 'projects/apache-beam-testing/locations/us/connections/apache-beam-testing-storageapi-biglake-nodelete', # pylint: disable=line-too-long - 'storageUri': 'gs://apache-beam-testing-bq-biglake/BigQueryXlangStorageWriteIT', # pylint: disable=line-too-long - 'fileFormat': 'parquet', - 'tableFormat': 'iceberg' + 'connectionId': 'projects/apache-beam-testing/locations/us/connections/apache-beam-testing-storageapi-biglake-nodelete', # pylint: disable=line-too-long + 'storageUri': 'gs://apache-beam-testing-bq-biglake/BigQueryXlangStorageWriteIT', # pylint: disable=line-too-long + 'fileFormat': 'parquet', + 'tableFormat': 'iceberg' } bq_matcher = BigqueryFullResultMatcher(