diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 95fef3e26ca2..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 13 + "modification": 1 } diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 1a8cac7ffb65..38621571ca1d 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -135,6 +135,8 @@ * LogicalTypes.Date <-----> LogicalType(DATE) * <------ LogicalType(urn="beam:logical_type:date:v1") * LogicalTypes.TimestampMillis <-----> DATETIME + * LogicalTypes.TimestampMicros ------> Long + * LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1") * LogicalTypes.Decimal <-----> DECIMAL * * @@ -1181,6 +1183,9 @@ private static org.apache.avro.Schema getFieldSchema( baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); } else if ("TIME".equals(identifier)) { baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); + } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { + baseType = + LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); } else { throw new RuntimeException( "Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier()); @@ -1331,6 +1336,10 @@ private static org.apache.avro.Schema getFieldSchema( return ((java.time.LocalDate) value).toEpochDay(); } else if ("TIME".equals(identifier)) { return (int) ((Instant) value).getMillis(); + } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { + java.time.Instant instant = (java.time.Instant) value; + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); } else { throw new RuntimeException("Unhandled logical type " + identifier); } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 7cda1e9dba5a..41a43ed850b7 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -1038,6 +1039,39 @@ public void testAvroBytesToRowAndRowToAvroBytesFunctions() { assertEquals(row, deserializedRow); } + @Test + public void testBeamTimestampLogicalTypeToAvro() { + // Tests special handling for Beam's MicrosInstant logical type + // Only one way (Beam to Avro) + + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampMicrosLT", SqlTypes.TIMESTAMP).build(); + List fields = Lists.newArrayList(); + fields.add( + new org.apache.avro.Schema.Field( + "timestampMicrosLT", + LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)), + "", + (Object) null)); + org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields); + + assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema)); + + java.time.Instant instant = + java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()).plusNanos(123000); + Row beamRow = Row.withSchema(beamSchema).addValue(instant).build(); + GenericRecord avroRecord = + new GenericRecordBuilder(avroSchema) + .set( + "timestampMicrosLT", + TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + + TimeUnit.NANOSECONDS.toMicros(instant.getNano())) + .build(); + + assertEquals(avroRecord, AvroUtils.toGenericRecord(beamRow)); + } + @Test public void testNullSchemas() { assertEquals( diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 51ae97b99175..d659d57aad90 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -114,7 +114,8 @@ def setUp(self): self.project = self.test_pipeline.get_option('project') self._runner = PipelineOptions(self.args).get_all_options()['runner'] - self.bigquery_client = BigQueryWrapper() + self.bigquery_client = BigQueryWrapper.from_pipeline_options( + self.test_pipeline.options) self.dataset_id = '%s_%s_%s' % ( self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3)) self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) @@ -154,7 +155,7 @@ def assert_iceberg_tables_created( self, table_prefix, storage_uri, expected_count=1): """Verify that Iceberg table directories are created in the warehouse location. - + Args: table_prefix: The table name prefix to look for storage_uri: The GCS storage URI (e.g., 'gs://bucket/path') @@ -607,6 +608,36 @@ def test_write_with_big_lake_configuration(self): # Verify that the table directory was created in the warehouse location self.assert_iceberg_tables_created(table, big_lake_config['storageUri']) + def test_write_with_managed_transform(self): + table = 'write_with_managed_transform' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + + row_elements = [ + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS + ] + + expected = [] + for e in self.ELEMENTS: + del e["numeric"] + expected.append(e) + bq_matcher = BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table), + data=self.parse_expected_data(expected)) + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(row_elements) + | beam.managed.Write("bigquery", config={"table": table_id})) + hamcrest_assert(p, bq_matcher) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)