Skip to content

Commit 5806b6e

Browse files
authored
Support Beam MicrosInstant conversion to Avro Timestamp (#36605)
* support MicrosInstant conversion to Avro Timestamp * add test * style * skip if no expansion jars * trigger ITs * style
1 parent f5b4b6d commit 5806b6e

File tree

4 files changed

+77
-3
lines changed

4 files changed

+77
-3
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 13
3+
"modification": 1
44
}

sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@
135135
* LogicalTypes.Date <-----> LogicalType(DATE)
136136
* <------ LogicalType(urn="beam:logical_type:date:v1")
137137
* LogicalTypes.TimestampMillis <-----> DATETIME
138+
* LogicalTypes.TimestampMicros ------> Long
139+
* LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1")
138140
* LogicalTypes.Decimal <-----> DECIMAL
139141
* </pre>
140142
*
@@ -1181,6 +1183,9 @@ private static org.apache.avro.Schema getFieldSchema(
11811183
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
11821184
} else if ("TIME".equals(identifier)) {
11831185
baseType = LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
1186+
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
1187+
baseType =
1188+
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
11841189
} else {
11851190
throw new RuntimeException(
11861191
"Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1331,6 +1336,10 @@ private static org.apache.avro.Schema getFieldSchema(
13311336
return ((java.time.LocalDate) value).toEpochDay();
13321337
} else if ("TIME".equals(identifier)) {
13331338
return (int) ((Instant) value).getMillis();
1339+
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
1340+
java.time.Instant instant = (java.time.Instant) value;
1341+
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
1342+
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano());
13341343
} else {
13351344
throw new RuntimeException("Unhandled logical type " + identifier);
13361345
}

sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.UUID;
35+
import java.util.concurrent.TimeUnit;
3536
import org.apache.avro.Conversions;
3637
import org.apache.avro.LogicalType;
3738
import org.apache.avro.LogicalTypes;
@@ -1038,6 +1039,39 @@ public void testAvroBytesToRowAndRowToAvroBytesFunctions() {
10381039
assertEquals(row, deserializedRow);
10391040
}
10401041

1042+
@Test
1043+
public void testBeamTimestampLogicalTypeToAvro() {
1044+
// Tests special handling for Beam's MicrosInstant logical type
1045+
// Only one way (Beam to Avro)
1046+
1047+
Schema beamSchema =
1048+
Schema.builder().addLogicalTypeField("timestampMicrosLT", SqlTypes.TIMESTAMP).build();
1049+
List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
1050+
fields.add(
1051+
new org.apache.avro.Schema.Field(
1052+
"timestampMicrosLT",
1053+
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
1054+
"",
1055+
(Object) null));
1056+
org.apache.avro.Schema avroSchema =
1057+
org.apache.avro.Schema.createRecord("topLevelRecord", null, null, false, fields);
1058+
1059+
assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema));
1060+
1061+
java.time.Instant instant =
1062+
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()).plusNanos(123000);
1063+
Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
1064+
GenericRecord avroRecord =
1065+
new GenericRecordBuilder(avroSchema)
1066+
.set(
1067+
"timestampMicrosLT",
1068+
TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
1069+
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano()))
1070+
.build();
1071+
1072+
assertEquals(avroRecord, AvroUtils.toGenericRecord(beamRow));
1073+
}
1074+
10411075
@Test
10421076
public void testNullSchemas() {
10431077
assertEquals(

sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ def setUp(self):
114114
self.project = self.test_pipeline.get_option('project')
115115
self._runner = PipelineOptions(self.args).get_all_options()['runner']
116116

117-
self.bigquery_client = BigQueryWrapper()
117+
self.bigquery_client = BigQueryWrapper.from_pipeline_options(
118+
self.test_pipeline.options)
118119
self.dataset_id = '%s_%s_%s' % (
119120
self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3))
120121
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
@@ -154,7 +155,7 @@ def assert_iceberg_tables_created(
154155
self, table_prefix, storage_uri, expected_count=1):
155156
"""Verify that Iceberg table directories are created in
156157
the warehouse location.
157-
158+
158159
Args:
159160
table_prefix: The table name prefix to look for
160161
storage_uri: The GCS storage URI (e.g., 'gs://bucket/path')
@@ -607,6 +608,36 @@ def test_write_with_big_lake_configuration(self):
607608
# Verify that the table directory was created in the warehouse location
608609
self.assert_iceberg_tables_created(table, big_lake_config['storageUri'])
609610

611+
def test_write_with_managed_transform(self):
612+
table = 'write_with_managed_transform'
613+
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
614+
615+
row_elements = [
616+
beam.Row(
617+
my_int=e['int'],
618+
my_float=e['float'],
619+
my_string=e['str'],
620+
my_bool=e['bool'],
621+
my_bytes=e['bytes'],
622+
my_timestamp=e['timestamp']) for e in self.ELEMENTS
623+
]
624+
625+
expected = []
626+
for e in self.ELEMENTS:
627+
del e["numeric"]
628+
expected.append(e)
629+
bq_matcher = BigqueryFullResultMatcher(
630+
project=self.project,
631+
query="SELECT * FROM {}.{}".format(self.dataset_id, table),
632+
data=self.parse_expected_data(expected))
633+
634+
with beam.Pipeline(argv=self.args) as p:
635+
_ = (
636+
p
637+
| beam.Create(row_elements)
638+
| beam.managed.Write("bigquery", config={"table": table_id}))
639+
hamcrest_assert(p, bq_matcher)
640+
610641

611642
if __name__ == '__main__':
612643
logging.getLogger().setLevel(logging.INFO)

0 commit comments

Comments
 (0)