Skip to content

Commit 139724d

Browse files
claudevdmClaude
andauthored
Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default schemafactory (#37257)
* Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default schemafactory * Use default schemafactory in test. --------- Co-authored-by: Claude <cvandermerwe@google.com>
1 parent ef0a03c commit 139724d

File tree

2 files changed

+27
-8
lines changed

2 files changed

+27
-8
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
102102
// boolean
103103
return SchemaBuilder.builder().booleanType();
104104
case "TIMESTAMP":
105-
// in Extract Jobs, it always uses the Avro logical type
106-
// we may have to change this if we move to EXPORT DATA
107-
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
105+
if (schema.getTimestampPrecision() == null || schema.getTimestampPrecision() == 6) {
106+
// in Extract Jobs, it always uses the Avro logical type
107+
// we may have to change this if we move to EXPORT DATA
108+
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
109+
}
110+
return SchemaBuilder.builder()
111+
.longBuilder()
112+
.prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE)
113+
.endLong();
108114
case "DATE":
109115
if (useAvroLogicalTypes) {
110116
return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,10 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
408408
.name("ts_nanos")
409409
.type(longSchema)
410410
.noDefault()
411+
.name("ts_picos")
412+
.type()
413+
.stringType()
414+
.noDefault()
411415
.endRecord();
412416
}
413417

@@ -421,12 +425,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
421425
public void testWriteGenericRecordTimestampNanos() throws Exception {
422426
String tableSpec =
423427
String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test");
424-
425428
// Create GenericRecord with timestamp-nanos value
426429
GenericRecord record =
427430
new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA)
428431
.set(
429432
"ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano())
433+
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z")
430434
.build();
431435

432436
// Write using Storage Write API with Avro format
@@ -437,7 +441,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
437441
"WriteGenericRecords",
438442
BigQueryIO.writeGenericRecords()
439443
.to(tableSpec)
440-
.withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA)
441444
.withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true))
442445
.useAvroLogicalTypes()
443446
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
@@ -457,12 +460,18 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
457460
.from(tableSpec));
458461

459462
PAssert.that(result)
460-
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
463+
.containsInAnyOrder(
464+
new TableRow()
465+
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
466+
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
461467
readPipeline.run().waitUntilFinish();
462468
}
463469

464470
private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA =
465-
Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build();
471+
Schema.builder()
472+
.addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS))
473+
.addField("ts_picos", Schema.FieldType.STRING)
474+
.build();
466475

467476
@Test
468477
public void testWriteBeamRowTimestampNanos() throws Exception {
@@ -472,6 +481,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
472481
Row row =
473482
Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)
474483
.withFieldValue("ts_nanos", TEST_INSTANT)
484+
.withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z")
475485
.build();
476486

477487
// Write using Storage Write API with Beam Schema
@@ -500,7 +510,10 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
500510
.from(tableSpec));
501511

502512
PAssert.that(result)
503-
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
513+
.containsInAnyOrder(
514+
new TableRow()
515+
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
516+
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
504517
readPipeline.run().waitUntilFinish();
505518
}
506519

0 commit comments

Comments
 (0)